From 42548a110bc72dba61be70a9bc152871f6ecdae5 Mon Sep 17 00:00:00 2001 From: John Corwin Date: Wed, 26 Jan 2011 12:55:08 -0800 Subject: [PATCH 1/4] use inotify in TailSource instead of busy-waiting --- src/source.cpp | 29 ++++++++++++++++++++++++++--- src/source.h | 1 + 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/src/source.cpp b/src/source.cpp index e74d5614..c7753dc3 100644 --- a/src/source.cpp +++ b/src/source.cpp @@ -18,6 +18,7 @@ #include "source.h" #include "scribe_server.h" +#include using boost::shared_ptr; using boost::property_tree::ptree; @@ -65,9 +66,16 @@ void Source::stop() {} void Source::run() {} -TailSource::TailSource(ptree& configuration) : Source(configuration) {} +TailSource::TailSource(ptree& configuration) : Source(configuration) { + inotify_fd = inotify_init(); + if (inotify_fd < 0) { + LOG_OPER("inotify_init failed: %s", strerror(errno)); + } +} -TailSource::~TailSource() {} +TailSource::~TailSource() { + close(inotify_fd); +} void TailSource::configure() { Source::configure(); @@ -76,6 +84,13 @@ void TailSource::configure() { LOG_OPER("[%s] Invalid TailSource configuration! No specified.", categoryHandled.c_str()); validConfiguration = false; + } else { + int rv = inotify_add_watch(inotify_fd, filename.c_str(), IN_MODIFY); + if (rv < 0) { + LOG_OPER("Failed to add inotify watch for file %s: %s", + filename.c_str(), strerror(errno)); + validConfiguration = false; + } } } @@ -112,6 +127,15 @@ void TailSource::run() { vector messages; while (active) { + struct inotify_event event; + memset(&event, 0, sizeof(event)); + int rv = read(inotify_fd, &event, sizeof(event)); + if (rv < 0) { + LOG_OPER("Failed to read inotify event for file %s: %s", filename.c_str(), strerror(errno)); + sleep(10); + continue; + } + stat(filename.c_str(), ¤tStat); // Files sometimes have their inode changed, such as during some types @@ -157,7 +181,6 @@ void TailSource::run() { } else if (in.eof()) { in.clear(); } - usleep(1); } LOG_OPER("[%s] Closing tailed log file <%s>", categoryHandled.c_str(), filename.c_str()); diff --git a/src/source.h b/src/source.h index 3a4e2c0c..3a9f71a0 100644 --- a/src/source.h +++ b/src/source.h @@ -60,6 +60,7 @@ class TailSource : public Source { private: std::string filename; boost::iostreams::filtering_istream in; + int inotify_fd; }; #endif /* SCRIBE_SOURCE_H_ */ From 5a83a5fc0a3cce44dac75a18fd08d998a46574d0 Mon Sep 17 00:00:00 2001 From: John Corwin Date: Wed, 26 Jan 2011 13:23:41 -0800 Subject: [PATCH 2/4] conditional use of inotify if available --- configure.ac | 3 +++ src/source.cpp | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/configure.ac b/configure.ac index 0cef6247..4fdbd5eb 100644 --- a/configure.ac +++ b/configure.ac @@ -69,6 +69,9 @@ AX_BOOST_BASE([1.36]) AX_BOOST_SYSTEM AX_BOOST_FILESYSTEM +AC_CHECK_FUNCS([inotify_init], [AC_DEFINE([HAVE_INOTIFY], [1], [Check for +libinotify])]) + # Generates Makefile from Makefile.am. Modify when new subdirs are added. # Change Makefile.am also to add subdirectly. AC_CONFIG_FILES(Makefile src/Makefile lib/py/Makefile) diff --git a/src/source.cpp b/src/source.cpp index c7753dc3..134462bb 100644 --- a/src/source.cpp +++ b/src/source.cpp @@ -18,7 +18,10 @@ #include "source.h" #include "scribe_server.h" + +#ifdef HAVE_INOTIFY #include +#endif using boost::shared_ptr; using boost::property_tree::ptree; @@ -67,14 +70,20 @@ void Source::run() {} TailSource::TailSource(ptree& configuration) : Source(configuration) { +#ifdef HAVE_INOTIFY inotify_fd = inotify_init(); if (inotify_fd < 0) { LOG_OPER("inotify_init failed: %s", strerror(errno)); } +#endif } TailSource::~TailSource() { +#ifdef HAVE_INOTIFY + if (inotify_fd >= 0) { close(inotify_fd); + } +#endif } void TailSource::configure() { @@ -85,12 +94,14 @@ void TailSource::configure() { categoryHandled.c_str()); validConfiguration = false; } else { +#ifdef HAVE_INOTIFY int rv = inotify_add_watch(inotify_fd, filename.c_str(), IN_MODIFY); if (rv < 0) { LOG_OPER("Failed to add inotify watch for file %s: %s", filename.c_str(), strerror(errno)); validConfiguration = false; } +#endif } } @@ -127,14 +138,21 @@ void TailSource::run() { vector messages; while (active) { +#ifdef HAVE_INOFITY struct inotify_event event; memset(&event, 0, sizeof(event)); int rv = read(inotify_fd, &event, sizeof(event)); + LOG_DEBUG("Detected change in file %s", filename.c_str()); if (rv < 0) { LOG_OPER("Failed to read inotify event for file %s: %s", filename.c_str(), strerror(errno)); sleep(10); continue; } +#else + // inotify is not available. Sleep for one second before checking the file. + sleep(1); +#endif + stat(filename.c_str(), ¤tStat); From 0c222eaf20a8f8f41b22c01f78937dfbd764dec3 Mon Sep 17 00:00:00 2001 From: John Corwin Date: Thu, 27 Jan 2011 14:14:39 -0800 Subject: [PATCH 3/4] Refactor inotify code Watch parent path in addition to file path to detect changes to open files --- src/Makefile.am | 9 ++- src/PathWatcher.cpp | 115 +++++++++++++++++++++++++++++++ src/PathWatcher.h | 50 ++++++++++++++ src/TestPathWatcher.cpp | 145 ++++++++++++++++++++++++++++++++++++++++ src/source.cpp | 79 ++++++++++++---------- src/source.h | 6 +- 6 files changed, 363 insertions(+), 41 deletions(-) create mode 100644 src/PathWatcher.cpp create mode 100644 src/PathWatcher.h create mode 100644 src/TestPathWatcher.cpp diff --git a/src/Makefile.am b/src/Makefile.am index f6227ede..710f458f 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -67,7 +67,7 @@ endif # DO NOT USE CPPFLAGS, CXXFLAGS, CFLAGS, LDFLAGS here! Set in configure.ac and|or override on command line. # USE flags AM_CXXFLAGS, AM_CFLAGS, AM_CPPFLAGS, AM_LDFLAGS, LDADD in this section. -AM_CPPFLAGS = -I.. +AM_CPPFLAGS = -I.. -g AM_CPPFLAGS += -I$(thrift_home)/include AM_CPPFLAGS += -I$(thrift_home)/include/thrift AM_CPPFLAGS += -I$(fb303_home)/include/thrift @@ -104,7 +104,7 @@ endif # Binaries -- multiple progs can be defined. bin_PROGRAMS = scribed -scribed_SOURCES = source.cpp store.cpp store_queue.cpp SourceConf.cpp conf.cpp file.cpp conn_pool.cpp scribe_server.cpp network_dynamic_config.cpp dynamic_bucket_updater.cpp url.cpp $(FB_SOURCES) $(ENV_SOURCES) +scribed_SOURCES = source.cpp store.cpp store_queue.cpp SourceConf.cpp conf.cpp file.cpp conn_pool.cpp scribe_server.cpp network_dynamic_config.cpp dynamic_bucket_updater.cpp url.cpp PathWatcher.cpp $(FB_SOURCES) $(ENV_SOURCES) if USE_SCRIBE_HDFS scribed_SOURCES += HdfsFile.cpp endif @@ -118,11 +118,14 @@ if SHARED scribed_DEPENDENCIES = libscribe.so endif -TESTS = url_test +TESTS = url_test TestPathWatcher check_PROGRAMS = $(TESTS) url_test_SOURCES = url.h url.cpp url_test.cpp url_test_CXXFLAGS = $(CPPUNIT_CFLAGS) url_test_LDFLAGS = $(CPPUNIT_LIBS) +TestPathWatcher_SOURCES = PathWatcher.h PathWatcher.cpp TestPathWatcher.cpp +TestPathWatcher_CXXFLAGS = $(CPPUNIT_CFLAGS) +TestPathWatcher_LDFLAGS = $(CPPUNIT_LIBS) # Section 4 ############################################################################## # Set up Thrift specific activity here. diff --git a/src/PathWatcher.cpp b/src/PathWatcher.cpp new file mode 100644 index 00000000..cab30d4b --- /dev/null +++ b/src/PathWatcher.cpp @@ -0,0 +1,115 @@ +#include "PathWatcher.h" + +#include + +#ifdef HAVE_INOTIFY +#include +#define EVENT_BUF_LEN (1024 * (sizeof(event) + 16)) +#define FILE_WATCH_EVENTS (IN_MODIFY | IN_DELETE_SELF | IN_MOVE_SELF) +#define DIR_WATCH_EVENTS (IN_CREATE | IN_DELETE | IN_DELETE_SELF | IN_MOVE_SELF | IN_MOVED_FROM) +#endif + +PathWatcher::PathWatcher() : inotify_file_wd(-1), inotify_dir_wd(-1) { +#ifdef HAVE_INOTIFY + inotify_fd = inotify_init(); + if (inotify_fd < 0) { + LOG_OPER("inotify_init failed: %s", strerror(errno)); + } +#endif +} + +PathWatcher::~PathWatcher() { +#ifdef HAVE_INOTIFY + if (inotify_fd >= 0) { + close(inotify_fd); + } +#endif +} + +// Clear any existing watched file or directory. +void PathWatcher::clearWatches() { +#ifdef HAVE_INOTIFY + if (inotify_file_wd >= 0) { + LOG_OPER("Deleting existing file watch"); + inotify_rm_watch(inotify_fd, inotify_file_wd); + inotify_file_wd = -1; + } + + if (inotify_dir_wd >= 0) { + LOG_OPER("Deleting existing directory watch"); + inotify_rm_watch(inotify_fd, inotify_dir_wd); + inotify_dir_wd = -1; + } + watchedFile.clear(); +#endif +} + +bool PathWatcher::tryWatchFile(const std::string & path) { +#ifdef HAVE_INOTIFY + clearWatches(); + boost::filesystem::path watchedPath(path); + inotify_file_wd = inotify_add_watch(inotify_fd, path.c_str(), FILE_WATCH_EVENTS); + if (inotify_file_wd >= 0) { + std::string parentDir = watchedPath.parent_path().string(); + inotify_dir_wd = inotify_add_watch(inotify_fd, parentDir.c_str(), DIR_WATCH_EVENTS); + LOG_OPER("Set inotify watch for file %s with parent directory %s", path.c_str(), parentDir.c_str()); + watchedFile = watchedPath.filename(); + return true; + } + return false; +#endif + return true; +} + +bool PathWatcher::tryWatchDirectory(const std::string & path) { +#ifdef HAVE_INOTIFY + clearWatches(); + LOG_OPER("Attempting to watch path %s", path.c_str()); + inotify_dir_wd = inotify_add_watch(inotify_fd, path.c_str(), DIR_WATCH_EVENTS); + if (inotify_dir_wd >= 0) { + LOG_OPER("Watching path %s", path.c_str()); + return true; + } + return false; +#endif + return true; +} + +void PathWatcher::waitForEvent(bool & fileEvent, bool & rewatch) { +#ifdef HAVE_INOTIFY + rewatch = false; + fileEvent = false; + + char eventBuf[EVENT_BUF_LEN]; + int rv = read(inotify_fd, eventBuf, EVENT_BUF_LEN); + if (rv < 0) { + LOG_OPER("Failed to read inotify event: %s", strerror(errno)); + rewatch = true; + return; + } + int i = 0; + while (i < rv) { + struct inotify_event *event; + event = (struct inotify_event *) &eventBuf[i]; + if (inotify_file_wd != -1 && inotify_file_wd == event->wd) { + // File event + fileEvent = true; + } else if (inotify_file_wd != -1 && inotify_dir_wd == event->wd) { + // Directory event with a watched file + std::string alteredFile(event->name); + if (alteredFile == watchedFile) { + rewatch = true; + } + } else if (inotify_dir_wd == event->wd) { + // Directory event with no existing file + rewatch = true; + } + i += sizeof(inotify_event) + event->len; + } +#else + // inotify is not available. Sleep for one second before checking the file. + rewatch = false; + fileEvent = true; + sleep(1); +#endif +} diff --git a/src/PathWatcher.h b/src/PathWatcher.h new file mode 100644 index 00000000..b8064cc9 --- /dev/null +++ b/src/PathWatcher.h @@ -0,0 +1,50 @@ +/** + * Copyright 2010 Twitter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @author John Corwin + */ + +#ifndef PATH_WATCHER_H + +#include "common.h" + +class PathWatcher { + public: + PathWatcher(); + ~PathWatcher(); + + // Attempt to watch a file. Return true if successful. + bool tryWatchFile(const std::string & path); + + // Attempt to watch a directory. Return true if successful. + bool tryWatchDirectory(const std::string & path); + + /** + * Wait for events. + * fileEvent will be set if the watched file was modified. + * rewatch will be set if a change to the file or parent directory + * requires rewatching. + */ + void waitForEvent(bool & fileEvent, bool & rewatch); + + private: + void clearWatches(); + int inotify_fd; + int inotify_file_wd; + int inotify_dir_wd; + std::string watchedFile; +}; + +#endif diff --git a/src/TestPathWatcher.cpp b/src/TestPathWatcher.cpp new file mode 100644 index 00000000..6b58e90e --- /dev/null +++ b/src/TestPathWatcher.cpp @@ -0,0 +1,145 @@ +#include "PathWatcher.h" + +#include +#include +#include + +class TestPathWatcher : public CppUnit::TestCase { +public: + CPPUNIT_TEST_SUITE(TestPathWatcher); + CPPUNIT_TEST(testFileModified); + CPPUNIT_TEST(testFileCreated); + CPPUNIT_TEST(testFileDeleted); + CPPUNIT_TEST(testFileMoved); + CPPUNIT_TEST_SUITE_END(); + +public: + std::string tempDir; + + TestPathWatcher() { + std::ostringstream dir; + time_t ts = time(NULL); + dir << "/tmp/scribe-test-" << ts; + tempDir = dir.str(); + mkdir(tempDir.c_str(), S_IRWXU | S_IRWXG); + } + + void testFileModified() { + // Create and watch a file. + std::string file = tempDir + "/test.txt"; + std::fstream f(file.c_str(), std::ios_base::out); + f << "line1" << std::endl; + PathWatcher pathWatcher; + std::cout << "Attempting to watch " << file << std::endl; + CPPUNIT_ASSERT(pathWatcher.tryWatchFile(file)); + + // Modify the file. + f << "line2" << std::endl; + + // Make sure we're notified. + bool fileEvent, rewatch; + pathWatcher.waitForEvent(fileEvent, rewatch); + CPPUNIT_ASSERT(fileEvent); + CPPUNIT_ASSERT(!rewatch); + f.close(); + unlink(file.c_str()); + } + + void testFileCreated() { + std::string file = tempDir + "/test.txt"; + PathWatcher pathWatcher; + + // Watch a non-existing file. + std::cout << "Attempting to watch " << file << std::endl; + CPPUNIT_ASSERT(!pathWatcher.tryWatchFile(file)); + CPPUNIT_ASSERT(pathWatcher.tryWatchDirectory(tempDir)); + + // Create the file. + std::fstream f(file.c_str(), std::ios_base::out); + f << "line1" << std::endl; + bool fileEvent, rewatch; + + // Make sure we're notified of the file creation. + pathWatcher.waitForEvent(fileEvent, rewatch); + CPPUNIT_ASSERT(!fileEvent); + CPPUNIT_ASSERT(rewatch); + + // Make sure we're notified when the file changes. + CPPUNIT_ASSERT(pathWatcher.tryWatchFile(file)); + f << "line2" << std::endl; + pathWatcher.waitForEvent(fileEvent, rewatch); + CPPUNIT_ASSERT(fileEvent); + f.close(); + unlink(file.c_str()); + } + + void testFileDeleted() { + std::string file = tempDir + "/test.txt"; + + // Create and watch a file. + std::fstream f(file.c_str(), std::ios_base::out); + f << "line1" << std::endl; + PathWatcher pathWatcher; + std::cout << "Attempting to watch " << file << std::endl; + CPPUNIT_ASSERT(pathWatcher.tryWatchFile(file)); + + // Delete the file. + unlink(file.c_str()); + + // Make sure we're notified. + bool fileEvent, rewatch; + pathWatcher.waitForEvent(fileEvent, rewatch); + CPPUNIT_ASSERT(!fileEvent); + CPPUNIT_ASSERT(rewatch); + f.close(); + } + + void testFileMoved() { + // Create and watch a file 'test.txt'. + std::string file = tempDir + "/test.txt"; + std::fstream f(file.c_str(), std::ios_base::out); + f << "line1" << std::endl; + f.close(); + + PathWatcher pathWatcher; + std::cout << "Attempting to watch " << file << std::endl; + CPPUNIT_ASSERT(pathWatcher.tryWatchFile(file)); + + // Move 'test.txt' to 'moved.txt'. + std::string movedFile = tempDir + "/moved.txt"; + rename(file.c_str(), movedFile.c_str()); + bool fileEvent, rewatch; + pathWatcher.waitForEvent(fileEvent, rewatch); + CPPUNIT_ASSERT(fileEvent); + CPPUNIT_ASSERT(rewatch); + CPPUNIT_ASSERT(!pathWatcher.tryWatchFile(file)); + CPPUNIT_ASSERT(pathWatcher.tryWatchDirectory(tempDir)); + + // Move 'moved.txt' back to 'test.txt'. + rename(movedFile.c_str(), file.c_str()); + pathWatcher.waitForEvent(fileEvent, rewatch); + CPPUNIT_ASSERT(!fileEvent); + CPPUNIT_ASSERT(rewatch); + CPPUNIT_ASSERT(pathWatcher.tryWatchFile(file)); + + // Make sure we still get notified on change to 'test.txt'. + std::fstream f2(file.c_str(), std::ios_base::out); + f2 << "line2" << std::endl; + pathWatcher.waitForEvent(fileEvent, rewatch); + CPPUNIT_ASSERT(fileEvent); + CPPUNIT_ASSERT(!rewatch); + f2.close(); + unlink(file.c_str()); + } +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(TestPathWatcher); + +int main(int argc, char **argv) +{ + CppUnit::TextUi::TestRunner runner; + CppUnit::TestFactoryRegistry ®istry = CppUnit::TestFactoryRegistry::getRegistry(); + runner.addTest( registry.makeTest() ); + runner.run(); + return 0; +} diff --git a/src/source.cpp b/src/source.cpp index 134462bb..9ace8bc1 100644 --- a/src/source.cpp +++ b/src/source.cpp @@ -19,10 +19,6 @@ #include "source.h" #include "scribe_server.h" -#ifdef HAVE_INOTIFY -#include -#endif - using boost::shared_ptr; using boost::property_tree::ptree; using namespace scribe::thrift; @@ -68,22 +64,10 @@ void Source::stop() {} void Source::run() {} - TailSource::TailSource(ptree& configuration) : Source(configuration) { -#ifdef HAVE_INOTIFY - inotify_fd = inotify_init(); - if (inotify_fd < 0) { - LOG_OPER("inotify_init failed: %s", strerror(errno)); - } -#endif } TailSource::~TailSource() { -#ifdef HAVE_INOTIFY - if (inotify_fd >= 0) { - close(inotify_fd); - } -#endif } void TailSource::configure() { @@ -94,17 +78,46 @@ void TailSource::configure() { categoryHandled.c_str()); validConfiguration = false; } else { -#ifdef HAVE_INOTIFY - int rv = inotify_add_watch(inotify_fd, filename.c_str(), IN_MODIFY); - if (rv < 0) { - LOG_OPER("Failed to add inotify watch for file %s: %s", - filename.c_str(), strerror(errno)); + if (!watchPath()) { validConfiguration = false; } -#endif } } +bool TailSource::watchPath() { + if (pathWatcher.tryWatchFile(filename)) { + return true; + } + // File watch failed. Try watching each parent directory. + LOG_OPER("Unable to watch %s. Attempting to watch parent directories.", filename.c_str()); + boost::filesystem::path fullPath(filename); + deque pathStack; + boost::filesystem::path::iterator pathIter = fullPath.begin(); + while (pathIter != fullPath.end()) { + pathStack.push_back(*pathIter); + pathIter++; + } + // We already failed to watch the file. Remove it from the path stack. + pathStack.pop_back(); + + // Attempt to watch a parent path of the file. + while (!pathStack.empty()) { + boost::filesystem::path pathToWatch; + for (deque::iterator elem = pathStack.begin(); elem != pathStack.end(); elem++) { + pathToWatch /= *elem; + } + string strPathToWatch = pathToWatch.string(); + LOG_OPER("Attempting to watch path %s", strPathToWatch.c_str()); + if (pathWatcher.tryWatchDirectory(strPathToWatch)) { + return true; + } + pathStack.pop_back(); + } + LOG_OPER("Failed to watch any parent paths of %s", filename.c_str()); + sleep(10); + return false; +} + void TailSource::start() { active = true; pthread_create(&sourceThread, NULL, sourceStarter, (void*) this); @@ -138,21 +151,15 @@ void TailSource::run() { vector messages; while (active) { -#ifdef HAVE_INOFITY - struct inotify_event event; - memset(&event, 0, sizeof(event)); - int rv = read(inotify_fd, &event, sizeof(event)); - LOG_DEBUG("Detected change in file %s", filename.c_str()); - if (rv < 0) { - LOG_OPER("Failed to read inotify event for file %s: %s", filename.c_str(), strerror(errno)); - sleep(10); - continue; + bool fileEvent; + bool rewatch; + pathWatcher.waitForEvent(fileEvent, rewatch); + if (rewatch) { + watchPath(); + } + if (!fileEvent) { + continue; } -#else - // inotify is not available. Sleep for one second before checking the file. - sleep(1); -#endif - stat(filename.c_str(), ¤tStat); diff --git a/src/source.h b/src/source.h index 3a9f71a0..62392154 100644 --- a/src/source.h +++ b/src/source.h @@ -21,6 +21,7 @@ #include "common.h" #include "conf.h" +#include "PathWatcher.h" #include #include @@ -48,7 +49,6 @@ class Source { bool validConfiguration; }; - class TailSource : public Source { public: TailSource(boost::property_tree::ptree& configuration); @@ -60,7 +60,9 @@ class TailSource : public Source { private: std::string filename; boost::iostreams::filtering_istream in; - int inotify_fd; + PathWatcher pathWatcher; + bool watchPath(); + bool waitForEvent(); }; #endif /* SCRIBE_SOURCE_H_ */ From 105481a3f3cbfcf7c1eacb5fcb19f4bdeef941c9 Mon Sep 17 00:00:00 2001 From: John Corwin Date: Thu, 27 Jan 2011 17:02:58 -0800 Subject: [PATCH 4/4] Add shutdown method to PathWatcher to interrupt blocking read call --- configure.ac | 3 +-- src/Makefile.am | 2 +- src/PathWatcher.cpp | 55 +++++++++++++++++++++++++++++++++++------ src/PathWatcher.h | 5 ++++ src/TestPathWatcher.cpp | 20 ++++++++++++++- src/source.cpp | 9 +++---- src/source.h | 1 - 7 files changed, 78 insertions(+), 17 deletions(-) diff --git a/configure.ac b/configure.ac index 4fdbd5eb..604e760f 100644 --- a/configure.ac +++ b/configure.ac @@ -69,8 +69,7 @@ AX_BOOST_BASE([1.36]) AX_BOOST_SYSTEM AX_BOOST_FILESYSTEM -AC_CHECK_FUNCS([inotify_init], [AC_DEFINE([HAVE_INOTIFY], [1], [Check for -libinotify])]) +AC_CHECK_FUNCS([inotify_init], [AC_DEFINE([HAVE_INOTIFY], [1], [Check for libinotify])]) # Generates Makefile from Makefile.am. Modify when new subdirs are added. # Change Makefile.am also to add subdirectly. diff --git a/src/Makefile.am b/src/Makefile.am index 710f458f..123879a7 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -67,7 +67,7 @@ endif # DO NOT USE CPPFLAGS, CXXFLAGS, CFLAGS, LDFLAGS here! Set in configure.ac and|or override on command line. # USE flags AM_CXXFLAGS, AM_CFLAGS, AM_CPPFLAGS, AM_LDFLAGS, LDADD in this section. -AM_CPPFLAGS = -I.. -g +AM_CPPFLAGS = -I.. AM_CPPFLAGS += -I$(thrift_home)/include AM_CPPFLAGS += -I$(thrift_home)/include/thrift AM_CPPFLAGS += -I$(fb303_home)/include/thrift diff --git a/src/PathWatcher.cpp b/src/PathWatcher.cpp index cab30d4b..36ff5236 100644 --- a/src/PathWatcher.cpp +++ b/src/PathWatcher.cpp @@ -1,3 +1,21 @@ +/** + * Copyright 2010 Twitter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @author John Corwin + */ + #include "PathWatcher.h" #include @@ -9,8 +27,9 @@ #define DIR_WATCH_EVENTS (IN_CREATE | IN_DELETE | IN_DELETE_SELF | IN_MOVE_SELF | IN_MOVED_FROM) #endif -PathWatcher::PathWatcher() : inotify_file_wd(-1), inotify_dir_wd(-1) { +PathWatcher::PathWatcher() : inotify_file_wd(-1), inotify_dir_wd(-1), active(true) { #ifdef HAVE_INOTIFY + pthread_mutex_init(&watchMutex, NULL); inotify_fd = inotify_init(); if (inotify_fd < 0) { LOG_OPER("inotify_init failed: %s", strerror(errno)); @@ -23,12 +42,14 @@ PathWatcher::~PathWatcher() { if (inotify_fd >= 0) { close(inotify_fd); } + pthread_mutex_destroy(&watchMutex); #endif } // Clear any existing watched file or directory. void PathWatcher::clearWatches() { #ifdef HAVE_INOTIFY + pthread_mutex_lock(&watchMutex); if (inotify_file_wd >= 0) { LOG_OPER("Deleting existing file watch"); inotify_rm_watch(inotify_fd, inotify_file_wd); @@ -41,38 +62,48 @@ void PathWatcher::clearWatches() { inotify_dir_wd = -1; } watchedFile.clear(); + pthread_mutex_unlock(&watchMutex); #endif } bool PathWatcher::tryWatchFile(const std::string & path) { + bool watched = false; + #ifdef HAVE_INOTIFY clearWatches(); boost::filesystem::path watchedPath(path); + + pthread_mutex_lock(&watchMutex); inotify_file_wd = inotify_add_watch(inotify_fd, path.c_str(), FILE_WATCH_EVENTS); if (inotify_file_wd >= 0) { std::string parentDir = watchedPath.parent_path().string(); inotify_dir_wd = inotify_add_watch(inotify_fd, parentDir.c_str(), DIR_WATCH_EVENTS); LOG_OPER("Set inotify watch for file %s with parent directory %s", path.c_str(), parentDir.c_str()); watchedFile = watchedPath.filename(); - return true; + watched = true; } - return false; + pthread_mutex_unlock(&watchMutex); #endif - return true; + + return watched; } bool PathWatcher::tryWatchDirectory(const std::string & path) { + bool watched = false; + #ifdef HAVE_INOTIFY clearWatches(); LOG_OPER("Attempting to watch path %s", path.c_str()); + pthread_mutex_lock(&watchMutex); inotify_dir_wd = inotify_add_watch(inotify_fd, path.c_str(), DIR_WATCH_EVENTS); if (inotify_dir_wd >= 0) { LOG_OPER("Watching path %s", path.c_str()); - return true; + watched = true; } - return false; + pthread_mutex_unlock(&watchMutex); #endif - return true; + + return watched; } void PathWatcher::waitForEvent(bool & fileEvent, bool & rewatch) { @@ -81,6 +112,9 @@ void PathWatcher::waitForEvent(bool & fileEvent, bool & rewatch) { fileEvent = false; char eventBuf[EVENT_BUF_LEN]; + if (!active) { + return; + } int rv = read(inotify_fd, eventBuf, EVENT_BUF_LEN); if (rv < 0) { LOG_OPER("Failed to read inotify event: %s", strerror(errno)); @@ -113,3 +147,10 @@ void PathWatcher::waitForEvent(bool & fileEvent, bool & rewatch) { sleep(1); #endif } + +void PathWatcher::shutdown() { +#ifdef HAVE_INOTIFY + active = false; + clearWatches(); +#endif +} diff --git a/src/PathWatcher.h b/src/PathWatcher.h index b8064cc9..75cae011 100644 --- a/src/PathWatcher.h +++ b/src/PathWatcher.h @@ -39,11 +39,16 @@ class PathWatcher { */ void waitForEvent(bool & fileEvent, bool & rewatch); + // Interrupts the thread waiting for events. + void shutdown(); + private: void clearWatches(); int inotify_fd; int inotify_file_wd; int inotify_dir_wd; + bool volatile active; + pthread_mutex_t watchMutex; std::string watchedFile; }; diff --git a/src/TestPathWatcher.cpp b/src/TestPathWatcher.cpp index 6b58e90e..1430d6fc 100644 --- a/src/TestPathWatcher.cpp +++ b/src/TestPathWatcher.cpp @@ -1,3 +1,21 @@ +/** + * Copyright 2010 Twitter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @author John Corwin + */ + #include "PathWatcher.h" #include @@ -75,7 +93,7 @@ class TestPathWatcher : public CppUnit::TestCase { void testFileDeleted() { std::string file = tempDir + "/test.txt"; - + // Create and watch a file. std::fstream f(file.c_str(), std::ios_base::out); f << "line1" << std::endl; diff --git a/src/source.cpp b/src/source.cpp index 9ace8bc1..12116b72 100644 --- a/src/source.cpp +++ b/src/source.cpp @@ -64,11 +64,9 @@ void Source::stop() {} void Source::run() {} -TailSource::TailSource(ptree& configuration) : Source(configuration) { -} +TailSource::TailSource(ptree& configuration) : Source(configuration) {} -TailSource::~TailSource() { -} +TailSource::~TailSource() {} void TailSource::configure() { Source::configure(); @@ -114,7 +112,6 @@ bool TailSource::watchPath() { pathStack.pop_back(); } LOG_OPER("Failed to watch any parent paths of %s", filename.c_str()); - sleep(10); return false; } @@ -125,6 +122,8 @@ void TailSource::start() { void TailSource::stop() { active = false; + LOG_OPER("Shutting down TailSource thread for %s", filename.c_str()); + pathWatcher.shutdown(); pthread_join(sourceThread, NULL); } diff --git a/src/source.h b/src/source.h index 62392154..31987b24 100644 --- a/src/source.h +++ b/src/source.h @@ -62,7 +62,6 @@ class TailSource : public Source { boost::iostreams::filtering_istream in; PathWatcher pathWatcher; bool watchPath(); - bool waitForEvent(); }; #endif /* SCRIBE_SOURCE_H_ */