-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwatcher.go
More file actions
148 lines (129 loc) · 3.03 KB
/
watcher.go
File metadata and controls
148 lines (129 loc) · 3.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package main
import (
"io"
"log"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/fsnotify/fsnotify"
)
// FileEvent represents a change detected in a JSONL file.
type FileEvent struct {
Path string
NewData []byte
}
// Watcher monitors JSONL files under the Claude projects directory.
type Watcher struct {
dir string
debounce time.Duration
fileCh chan FileEvent
offsets map[string]int64
mu sync.Mutex
timers map[string]*time.Timer
}
func NewWatcher(dir string, debounce time.Duration) *Watcher {
return &Watcher{
dir: dir,
debounce: debounce,
fileCh: make(chan FileEvent, 16),
offsets: make(map[string]int64),
timers: make(map[string]*time.Timer),
}
}
// Events returns the channel on which file events are delivered.
func (w *Watcher) Events() <-chan FileEvent {
return w.fileCh
}
// Run starts watching. It blocks until ctx is done or an unrecoverable error occurs.
func (w *Watcher) Run(done <-chan struct{}) error {
fsw, err := fsnotify.NewWatcher()
if err != nil {
return err
}
defer fsw.Close()
// Walk existing subdirectories and add them.
if err := w.addDirs(fsw, w.dir); err != nil {
log.Printf("warning: could not walk %s: %v", w.dir, err)
}
for {
select {
case <-done:
return nil
case ev, ok := <-fsw.Events:
if !ok {
return nil
}
// If a new directory is created, add it to the watcher.
if ev.Has(fsnotify.Create) {
if info, err := os.Stat(ev.Name); err == nil && info.IsDir() {
_ = w.addDirs(fsw, ev.Name)
}
}
if ev.Has(fsnotify.Write) && strings.HasSuffix(ev.Name, ".jsonl") &&
!strings.HasPrefix(filepath.Base(ev.Name), "agent-") {
w.scheduleRead(ev.Name)
}
case err, ok := <-fsw.Errors:
if !ok {
return nil
}
log.Printf("watcher error: %v", err)
}
}
}
func (w *Watcher) addDirs(fsw *fsnotify.Watcher, root string) error {
return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return nil // skip inaccessible entries
}
if info.IsDir() {
if addErr := fsw.Add(path); addErr != nil {
log.Printf("warning: cannot watch %s: %v", path, addErr)
}
}
return nil
})
}
func (w *Watcher) scheduleRead(path string) {
w.mu.Lock()
defer w.mu.Unlock()
if t, ok := w.timers[path]; ok {
t.Stop()
}
w.timers[path] = time.AfterFunc(w.debounce, func() {
w.readNewData(path)
})
}
func (w *Watcher) readNewData(path string) {
w.mu.Lock()
offset := w.offsets[path]
w.mu.Unlock()
f, err := os.Open(path)
if err != nil {
log.Printf("cannot open %s: %v", path, err)
return
}
defer f.Close()
// Seek to the last known offset.
if offset > 0 {
if _, err := f.Seek(offset, io.SeekStart); err != nil {
log.Printf("seek error for %s: %v", path, err)
return
}
}
data, err := io.ReadAll(f)
if err != nil {
log.Printf("read error for %s: %v", path, err)
return
}
if len(data) == 0 {
return
}
newOffset := offset + int64(len(data))
w.mu.Lock()
w.offsets[path] = newOffset
w.mu.Unlock()
w.fileCh <- FileEvent{Path: path, NewData: data}
}