diff --git a/configure.ac b/configure.ac index 0cef6247..604e760f 100644 --- a/configure.ac +++ b/configure.ac @@ -69,6 +69,8 @@ AX_BOOST_BASE([1.36]) AX_BOOST_SYSTEM AX_BOOST_FILESYSTEM +AC_CHECK_FUNCS([inotify_init], [AC_DEFINE([HAVE_INOTIFY], [1], [Check for libinotify])]) + # Generates Makefile from Makefile.am. Modify when new subdirs are added. # Change Makefile.am also to add subdirectly. AC_CONFIG_FILES(Makefile src/Makefile lib/py/Makefile) diff --git a/src/Makefile.am b/src/Makefile.am index f6227ede..123879a7 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -104,7 +104,7 @@ endif # Binaries -- multiple progs can be defined. bin_PROGRAMS = scribed -scribed_SOURCES = source.cpp store.cpp store_queue.cpp SourceConf.cpp conf.cpp file.cpp conn_pool.cpp scribe_server.cpp network_dynamic_config.cpp dynamic_bucket_updater.cpp url.cpp $(FB_SOURCES) $(ENV_SOURCES) +scribed_SOURCES = source.cpp store.cpp store_queue.cpp SourceConf.cpp conf.cpp file.cpp conn_pool.cpp scribe_server.cpp network_dynamic_config.cpp dynamic_bucket_updater.cpp url.cpp PathWatcher.cpp $(FB_SOURCES) $(ENV_SOURCES) if USE_SCRIBE_HDFS scribed_SOURCES += HdfsFile.cpp endif @@ -118,11 +118,14 @@ if SHARED scribed_DEPENDENCIES = libscribe.so endif -TESTS = url_test +TESTS = url_test TestPathWatcher check_PROGRAMS = $(TESTS) url_test_SOURCES = url.h url.cpp url_test.cpp url_test_CXXFLAGS = $(CPPUNIT_CFLAGS) url_test_LDFLAGS = $(CPPUNIT_LIBS) +TestPathWatcher_SOURCES = PathWatcher.h PathWatcher.cpp TestPathWatcher.cpp +TestPathWatcher_CXXFLAGS = $(CPPUNIT_CFLAGS) +TestPathWatcher_LDFLAGS = $(CPPUNIT_LIBS) # Section 4 ############################################################################## # Set up Thrift specific activity here. diff --git a/src/PathWatcher.cpp b/src/PathWatcher.cpp new file mode 100644 index 00000000..36ff5236 --- /dev/null +++ b/src/PathWatcher.cpp @@ -0,0 +1,156 @@ +/** + * Copyright 2010 Twitter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @author John Corwin + */ + +#include "PathWatcher.h" + +#include + +#ifdef HAVE_INOTIFY +#include +#define EVENT_BUF_LEN (1024 * (sizeof(event) + 16)) +#define FILE_WATCH_EVENTS (IN_MODIFY | IN_DELETE_SELF | IN_MOVE_SELF) +#define DIR_WATCH_EVENTS (IN_CREATE | IN_DELETE | IN_DELETE_SELF | IN_MOVE_SELF | IN_MOVED_FROM) +#endif + +PathWatcher::PathWatcher() : inotify_file_wd(-1), inotify_dir_wd(-1), active(true) { +#ifdef HAVE_INOTIFY + pthread_mutex_init(&watchMutex, NULL); + inotify_fd = inotify_init(); + if (inotify_fd < 0) { + LOG_OPER("inotify_init failed: %s", strerror(errno)); + } +#endif +} + +PathWatcher::~PathWatcher() { +#ifdef HAVE_INOTIFY + if (inotify_fd >= 0) { + close(inotify_fd); + } + pthread_mutex_destroy(&watchMutex); +#endif +} + +// Clear any existing watched file or directory. +void PathWatcher::clearWatches() { +#ifdef HAVE_INOTIFY + pthread_mutex_lock(&watchMutex); + if (inotify_file_wd >= 0) { + LOG_OPER("Deleting existing file watch"); + inotify_rm_watch(inotify_fd, inotify_file_wd); + inotify_file_wd = -1; + } + + if (inotify_dir_wd >= 0) { + LOG_OPER("Deleting existing directory watch"); + inotify_rm_watch(inotify_fd, inotify_dir_wd); + inotify_dir_wd = -1; + } + watchedFile.clear(); + pthread_mutex_unlock(&watchMutex); +#endif +} + +bool PathWatcher::tryWatchFile(const std::string & path) { + bool watched = false; + +#ifdef HAVE_INOTIFY + clearWatches(); + boost::filesystem::path watchedPath(path); + + pthread_mutex_lock(&watchMutex); + inotify_file_wd = inotify_add_watch(inotify_fd, path.c_str(), FILE_WATCH_EVENTS); + if (inotify_file_wd >= 0) { + std::string parentDir = watchedPath.parent_path().string(); + inotify_dir_wd = inotify_add_watch(inotify_fd, parentDir.c_str(), DIR_WATCH_EVENTS); + LOG_OPER("Set inotify watch for file %s with parent directory %s", path.c_str(), parentDir.c_str()); + watchedFile = watchedPath.filename(); + watched = true; + } + pthread_mutex_unlock(&watchMutex); +#endif + + return watched; +} + +bool PathWatcher::tryWatchDirectory(const std::string & path) { + bool watched = false; + +#ifdef HAVE_INOTIFY + clearWatches(); + LOG_OPER("Attempting to watch path %s", path.c_str()); + pthread_mutex_lock(&watchMutex); + inotify_dir_wd = inotify_add_watch(inotify_fd, path.c_str(), DIR_WATCH_EVENTS); + if (inotify_dir_wd >= 0) { + LOG_OPER("Watching path %s", path.c_str()); + watched = true; + } + pthread_mutex_unlock(&watchMutex); +#endif + + return watched; +} + +void PathWatcher::waitForEvent(bool & fileEvent, bool & rewatch) { +#ifdef HAVE_INOTIFY + rewatch = false; + fileEvent = false; + + char eventBuf[EVENT_BUF_LEN]; + if (!active) { + return; + } + int rv = read(inotify_fd, eventBuf, EVENT_BUF_LEN); + if (rv < 0) { + LOG_OPER("Failed to read inotify event: %s", strerror(errno)); + rewatch = true; + return; + } + int i = 0; + while (i < rv) { + struct inotify_event *event; + event = (struct inotify_event *) &eventBuf[i]; + if (inotify_file_wd != -1 && inotify_file_wd == event->wd) { + // File event + fileEvent = true; + } else if (inotify_file_wd != -1 && inotify_dir_wd == event->wd) { + // Directory event with a watched file + std::string alteredFile(event->name); + if (alteredFile == watchedFile) { + rewatch = true; + } + } else if (inotify_dir_wd == event->wd) { + // Directory event with no existing file + rewatch = true; + } + i += sizeof(inotify_event) + event->len; + } +#else + // inotify is not available. Sleep for one second before checking the file. + rewatch = false; + fileEvent = true; + sleep(1); +#endif +} + +void PathWatcher::shutdown() { +#ifdef HAVE_INOTIFY + active = false; + clearWatches(); +#endif +} diff --git a/src/PathWatcher.h b/src/PathWatcher.h new file mode 100644 index 00000000..75cae011 --- /dev/null +++ b/src/PathWatcher.h @@ -0,0 +1,55 @@ +/** + * Copyright 2010 Twitter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @author John Corwin + */ + +#ifndef PATH_WATCHER_H + +#include "common.h" + +class PathWatcher { + public: + PathWatcher(); + ~PathWatcher(); + + // Attempt to watch a file. Return true if successful. + bool tryWatchFile(const std::string & path); + + // Attempt to watch a directory. Return true if successful. + bool tryWatchDirectory(const std::string & path); + + /** + * Wait for events. + * fileEvent will be set if the watched file was modified. + * rewatch will be set if a change to the file or parent directory + * requires rewatching. + */ + void waitForEvent(bool & fileEvent, bool & rewatch); + + // Interrupts the thread waiting for events. + void shutdown(); + + private: + void clearWatches(); + int inotify_fd; + int inotify_file_wd; + int inotify_dir_wd; + bool volatile active; + pthread_mutex_t watchMutex; + std::string watchedFile; +}; + +#endif diff --git a/src/TestPathWatcher.cpp b/src/TestPathWatcher.cpp new file mode 100644 index 00000000..1430d6fc --- /dev/null +++ b/src/TestPathWatcher.cpp @@ -0,0 +1,163 @@ +/** + * Copyright 2010 Twitter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @author John Corwin + */ + +#include "PathWatcher.h" + +#include +#include +#include + +class TestPathWatcher : public CppUnit::TestCase { +public: + CPPUNIT_TEST_SUITE(TestPathWatcher); + CPPUNIT_TEST(testFileModified); + CPPUNIT_TEST(testFileCreated); + CPPUNIT_TEST(testFileDeleted); + CPPUNIT_TEST(testFileMoved); + CPPUNIT_TEST_SUITE_END(); + +public: + std::string tempDir; + + TestPathWatcher() { + std::ostringstream dir; + time_t ts = time(NULL); + dir << "/tmp/scribe-test-" << ts; + tempDir = dir.str(); + mkdir(tempDir.c_str(), S_IRWXU | S_IRWXG); + } + + void testFileModified() { + // Create and watch a file. + std::string file = tempDir + "/test.txt"; + std::fstream f(file.c_str(), std::ios_base::out); + f << "line1" << std::endl; + PathWatcher pathWatcher; + std::cout << "Attempting to watch " << file << std::endl; + CPPUNIT_ASSERT(pathWatcher.tryWatchFile(file)); + + // Modify the file. + f << "line2" << std::endl; + + // Make sure we're notified. + bool fileEvent, rewatch; + pathWatcher.waitForEvent(fileEvent, rewatch); + CPPUNIT_ASSERT(fileEvent); + CPPUNIT_ASSERT(!rewatch); + f.close(); + unlink(file.c_str()); + } + + void testFileCreated() { + std::string file = tempDir + "/test.txt"; + PathWatcher pathWatcher; + + // Watch a non-existing file. + std::cout << "Attempting to watch " << file << std::endl; + CPPUNIT_ASSERT(!pathWatcher.tryWatchFile(file)); + CPPUNIT_ASSERT(pathWatcher.tryWatchDirectory(tempDir)); + + // Create the file. + std::fstream f(file.c_str(), std::ios_base::out); + f << "line1" << std::endl; + bool fileEvent, rewatch; + + // Make sure we're notified of the file creation. + pathWatcher.waitForEvent(fileEvent, rewatch); + CPPUNIT_ASSERT(!fileEvent); + CPPUNIT_ASSERT(rewatch); + + // Make sure we're notified when the file changes. + CPPUNIT_ASSERT(pathWatcher.tryWatchFile(file)); + f << "line2" << std::endl; + pathWatcher.waitForEvent(fileEvent, rewatch); + CPPUNIT_ASSERT(fileEvent); + f.close(); + unlink(file.c_str()); + } + + void testFileDeleted() { + std::string file = tempDir + "/test.txt"; + + // Create and watch a file. + std::fstream f(file.c_str(), std::ios_base::out); + f << "line1" << std::endl; + PathWatcher pathWatcher; + std::cout << "Attempting to watch " << file << std::endl; + CPPUNIT_ASSERT(pathWatcher.tryWatchFile(file)); + + // Delete the file. + unlink(file.c_str()); + + // Make sure we're notified. + bool fileEvent, rewatch; + pathWatcher.waitForEvent(fileEvent, rewatch); + CPPUNIT_ASSERT(!fileEvent); + CPPUNIT_ASSERT(rewatch); + f.close(); + } + + void testFileMoved() { + // Create and watch a file 'test.txt'. + std::string file = tempDir + "/test.txt"; + std::fstream f(file.c_str(), std::ios_base::out); + f << "line1" << std::endl; + f.close(); + + PathWatcher pathWatcher; + std::cout << "Attempting to watch " << file << std::endl; + CPPUNIT_ASSERT(pathWatcher.tryWatchFile(file)); + + // Move 'test.txt' to 'moved.txt'. + std::string movedFile = tempDir + "/moved.txt"; + rename(file.c_str(), movedFile.c_str()); + bool fileEvent, rewatch; + pathWatcher.waitForEvent(fileEvent, rewatch); + CPPUNIT_ASSERT(fileEvent); + CPPUNIT_ASSERT(rewatch); + CPPUNIT_ASSERT(!pathWatcher.tryWatchFile(file)); + CPPUNIT_ASSERT(pathWatcher.tryWatchDirectory(tempDir)); + + // Move 'moved.txt' back to 'test.txt'. + rename(movedFile.c_str(), file.c_str()); + pathWatcher.waitForEvent(fileEvent, rewatch); + CPPUNIT_ASSERT(!fileEvent); + CPPUNIT_ASSERT(rewatch); + CPPUNIT_ASSERT(pathWatcher.tryWatchFile(file)); + + // Make sure we still get notified on change to 'test.txt'. + std::fstream f2(file.c_str(), std::ios_base::out); + f2 << "line2" << std::endl; + pathWatcher.waitForEvent(fileEvent, rewatch); + CPPUNIT_ASSERT(fileEvent); + CPPUNIT_ASSERT(!rewatch); + f2.close(); + unlink(file.c_str()); + } +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(TestPathWatcher); + +int main(int argc, char **argv) +{ + CppUnit::TextUi::TestRunner runner; + CppUnit::TestFactoryRegistry ®istry = CppUnit::TestFactoryRegistry::getRegistry(); + runner.addTest( registry.makeTest() ); + runner.run(); + return 0; +} diff --git a/src/source.cpp b/src/source.cpp index e74d5614..12116b72 100644 --- a/src/source.cpp +++ b/src/source.cpp @@ -64,7 +64,6 @@ void Source::stop() {} void Source::run() {} - TailSource::TailSource(ptree& configuration) : Source(configuration) {} TailSource::~TailSource() {} @@ -76,7 +75,44 @@ void TailSource::configure() { LOG_OPER("[%s] Invalid TailSource configuration! No specified.", categoryHandled.c_str()); validConfiguration = false; + } else { + if (!watchPath()) { + validConfiguration = false; + } + } +} + +bool TailSource::watchPath() { + if (pathWatcher.tryWatchFile(filename)) { + return true; } + // File watch failed. Try watching each parent directory. + LOG_OPER("Unable to watch %s. Attempting to watch parent directories.", filename.c_str()); + boost::filesystem::path fullPath(filename); + deque pathStack; + boost::filesystem::path::iterator pathIter = fullPath.begin(); + while (pathIter != fullPath.end()) { + pathStack.push_back(*pathIter); + pathIter++; + } + // We already failed to watch the file. Remove it from the path stack. + pathStack.pop_back(); + + // Attempt to watch a parent path of the file. + while (!pathStack.empty()) { + boost::filesystem::path pathToWatch; + for (deque::iterator elem = pathStack.begin(); elem != pathStack.end(); elem++) { + pathToWatch /= *elem; + } + string strPathToWatch = pathToWatch.string(); + LOG_OPER("Attempting to watch path %s", strPathToWatch.c_str()); + if (pathWatcher.tryWatchDirectory(strPathToWatch)) { + return true; + } + pathStack.pop_back(); + } + LOG_OPER("Failed to watch any parent paths of %s", filename.c_str()); + return false; } void TailSource::start() { @@ -86,6 +122,8 @@ void TailSource::start() { void TailSource::stop() { active = false; + LOG_OPER("Shutting down TailSource thread for %s", filename.c_str()); + pathWatcher.shutdown(); pthread_join(sourceThread, NULL); } @@ -112,6 +150,16 @@ void TailSource::run() { vector messages; while (active) { + bool fileEvent; + bool rewatch; + pathWatcher.waitForEvent(fileEvent, rewatch); + if (rewatch) { + watchPath(); + } + if (!fileEvent) { + continue; + } + stat(filename.c_str(), ¤tStat); // Files sometimes have their inode changed, such as during some types @@ -157,7 +205,6 @@ void TailSource::run() { } else if (in.eof()) { in.clear(); } - usleep(1); } LOG_OPER("[%s] Closing tailed log file <%s>", categoryHandled.c_str(), filename.c_str()); diff --git a/src/source.h b/src/source.h index 3a4e2c0c..31987b24 100644 --- a/src/source.h +++ b/src/source.h @@ -21,6 +21,7 @@ #include "common.h" #include "conf.h" +#include "PathWatcher.h" #include #include @@ -48,7 +49,6 @@ class Source { bool validConfiguration; }; - class TailSource : public Source { public: TailSource(boost::property_tree::ptree& configuration); @@ -60,6 +60,8 @@ class TailSource : public Source { private: std::string filename; boost::iostreams::filtering_istream in; + PathWatcher pathWatcher; + bool watchPath(); }; #endif /* SCRIBE_SOURCE_H_ */