Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions core/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os"
"os/exec"
"path/filepath"
"slices"
"strings"
"time"

Expand Down Expand Up @@ -1511,17 +1510,6 @@ func getFiles(ctx context.Context, t Driver) Files {
return files
}

func (t Files) Merge(f File) Files {
l := slices.Clone(t)
for i, this := range l {
if this.Name == f.Name {
l[i] = f
return l
}
}
return append(l, f)
}

func (t Files) Lookup(name string) (File, bool) {
for _, file := range t {
if file.Name == name {
Expand Down
162 changes: 142 additions & 20 deletions daemon/imon/fetch_resource_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,46 @@ import (
"github.com/opensvc/om3/v3/daemon/msgbus"
)

func (t *filesManager) Fetched() resource.Files {
l := make(resource.Files, len(t.fetched))
type (
ridFile struct {
*resource.File
rid string
}

ridFiles []ridFile

filesManager struct {
// fetched stores the resource files we fetched to avoid uneeded refetch
fetched map[string]ridFile

// attention stores a pending InstanceStatusUpdated event received while the fetch
// manager was already processing an event. This serves as a flag to immediately
// retrigger a new fetch cycle upon completion of the current one.
attention *msgbus.InstanceStatusUpdated

// fetching is true when the resource files fetch and ingest routine is
// running
fetching bool
}
)

func newFilesManager() *filesManager {
return &filesManager{
fetched: make(map[string]ridFile),
}
}

func (t ridFiles) Lookup(name string) (ridFile, bool) {
for _, file := range t {
if file.Name == name {
return file, true
}
}
return ridFile{}, false
}

func (t *filesManager) Fetched() ridFiles {
l := make(ridFiles, len(t.fetched))
i := 0
for _, v := range t.fetched {
l[i] = v
Expand All @@ -33,18 +71,25 @@ func (t *filesManager) Fetched() resource.Files {
}

func (t *Manager) handleResourceFiles(ev *msgbus.InstanceStatusUpdated) {
if t.instConfig.ActorConfig == nil {
return
}
if ev.Node == t.localhost {
t.handleLocalResourceFiles(ev)
return
}
if !t.needFetchResourceFiles(ev) {
return
}
if t.files.fetching {
t.files.attention = ev
return
}
t.doHandleResourceFiles(ev)
t.handlePeerResourceFiles(ev)
}

func (t *Manager) doHandleResourceFiles(ev *msgbus.InstanceStatusUpdated) {
func (t *Manager) handlePeerResourceFiles(ev *msgbus.InstanceStatusUpdated) {
t.files.attention = nil
if !t.needFetchResourceFiles(ev) {
return
}
t.files.fetching = true
go t.fetchResourceFiles(t.files.Fetched(), t.instStatus[t.localhost], ev)
}
Expand All @@ -54,22 +99,92 @@ func (t *Manager) needFetchResourceFiles(ev *msgbus.InstanceStatusUpdated) bool
if t.state.LocalExpect != instance.MonitorLocalExpectNone {
return false
}
if t.instConfig.ActorConfig == nil {
return false
}

// Is the remote node a valid resource file authority
if ev.Node == t.localhost {
return false
}
if ev.Value.Avail != status.Up {
return false
}

return true
}

func (t *Manager) fetchResourceFiles(fetched resource.Files, localInstanceStatus instance.Status, ev *msgbus.InstanceStatusUpdated) {
func (t *Manager) initLocalResourceFiles() {
instanceStatus, ok := t.instStatus[t.localhost]
if !ok {
return
}
for rid, localResourceStatus := range instanceStatus.Resources {
for _, f := range localResourceStatus.Files {
t.log.Infof("%s: file %s discovered (csum=%s, mtime=%s)", rid, f.Name, f.Checksum, f.Mtime)
t.files.fetched[f.Name] = ridFile{
rid: rid,
File: &f,
}
}
}
}

func (t *Manager) handleLocalResourceFiles(ev *msgbus.InstanceStatusUpdated) {
var needFetch bool

// Prepare a map of existing filenames (e.g. reported in the local
// instance status).
existingFilenames := make(map[string]any)
for rid, localResourceStatus := range ev.Value.Resources {
for _, f := range localResourceStatus.Files {
existingFilenames[f.Name] = nil

if ev.Value.Avail != status.Up {
fetchedFile, ok := t.files.fetched[f.Name]
if !ok {
t.log.Infof("%s: file %s discovered (csum=%s, mtime=%s)", rid, f.Name, f.Checksum, f.Mtime)
t.files.fetched[f.Name] = ridFile{
rid: rid,
File: &f,
}
needFetch = true
} else if fetchedFile.Checksum != f.Checksum {
t.log.Infof("%s: file %s altered locally (csum=%s, mtime=%s)", rid, f.Name, f.Checksum, f.Mtime)
needFetch = true
}
}
}
}

// Prepare the list of filenames we fetched but are no longer existing
var toDelete []string
for filename, fetchedFile := range t.files.fetched {
if _, ok := existingFilenames[filename]; !ok {
toDelete = append(toDelete, filename)
t.log.Infof("%s: file %s disappeared", fetchedFile.rid, filename)
needFetch = true
}
}

// Mark these filenames as not fetched
for _, filename := range toDelete {
delete(t.files.fetched, filename)
}

// If a file was changed or removed or discovered, fetch from the up instance
if needFetch && ev.Value.Avail != status.Up {
for nodename, instanceStatus := range instance.StatusData.GetByPath(t.path) {
if t.localhost == nodename {
continue
}
if instanceStatus.Avail != status.Up {
continue
}
t.handleResourceFiles(&msgbus.InstanceStatusUpdated{
Node: nodename,
Value: *instanceStatus,
})
break
}
}
}

func (t *Manager) fetchResourceFiles(fetched ridFiles, localInstanceStatus instance.Status, ev *msgbus.InstanceStatusUpdated) {

var (
ridsToIngest []string
Expand Down Expand Up @@ -97,7 +212,11 @@ func (t *Manager) fetchResourceFiles(fetched resource.Files, localInstanceStatus
if !ok {
// fallback to the t.instStatus cache, ie we never fetched
// this file yet.
localFile, _ = localResourceStatus.Files.Lookup(peerFile.Name)
file, _ := localResourceStatus.Files.Lookup(peerFile.Name)
localFile = ridFile{
File: &file,
rid: rid,
}
}
if !peerFile.Mtime.After(localFile.Mtime) {
continue
Expand All @@ -106,11 +225,14 @@ func (t *Manager) fetchResourceFiles(fetched resource.Files, localInstanceStatus
continue
}
if err := t.fetchResourceFile(rid, peerFile, ev.Node); err != nil {
t.log.Warnf("%s", err)
t.log.Warnf("%s: fetch %s: %s", rid, peerFile.Name, err)
continue
}

done.Files = append(done.Files, peerFile)
done.Files = append(done.Files, ridFile{
File: &peerFile,
rid: rid,
})

if peerFile.Ingest {
ridsToIngest = append(ridsToIngest, rid)
Expand All @@ -130,7 +252,7 @@ func (t *Manager) fetchResourceFiles(fetched resource.Files, localInstanceStatus
}

func (t *Manager) fetchResourceFile(rid string, peerFile resource.File, from string) error {
t.log.Infof("%s: fetch %s from %s", rid, peerFile.Name, from)
t.log.Infof("%s: file %s fetch from %s", rid, peerFile.Name, from)
c, err := client.New(
client.WithURL(daemonsubsystem.PeerURL(from)),
client.WithUsername(t.localhost),
Expand All @@ -151,7 +273,7 @@ func (t *Manager) fetchResourceFile(rid string, peerFile resource.File, from str
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s: fetch %s: %s", rid, peerFile.Name, resp.Status)
return fmt.Errorf("unexpected api response status %s", resp.Status)
}

// Create a temp file and write the response body
Expand Down Expand Up @@ -221,6 +343,6 @@ func (t *Manager) onFetchDone(c cmdFetchDone) {
}
t.files.fetching = false
if t.files.attention != nil {
t.doHandleResourceFiles(t.files.attention)
t.handlePeerResourceFiles(t.files.attention)
}
}
24 changes: 4 additions & 20 deletions daemon/imon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/opensvc/om3/v3/core/object"
"github.com/opensvc/om3/v3/core/priority"
"github.com/opensvc/om3/v3/core/rawconfig"
"github.com/opensvc/om3/v3/core/resource"
"github.com/opensvc/om3/v3/core/status"
"github.com/opensvc/om3/v3/daemon/daemondata"
"github.com/opensvc/om3/v3/daemon/daemonenv"
Expand Down Expand Up @@ -72,7 +71,7 @@ type (
// or InstanceStatusUpdated.
instStatus map[string]instance.Status

files filesManager
files *filesManager

// instMonitor tracks instance.Monitor for path on other nodes, iit is updated on
// ObjectStatusUpdated for path events where srcEvent is InstanceMonitorDeleted
Expand Down Expand Up @@ -170,20 +169,6 @@ type (
regularResourceOrchestrate orchestrationResource
}

filesManager struct {
// fetched stores the resource files we fetched to avoid uneeded refetch
fetched map[string]resource.File

// attention stores a pending InstanceStatusUpdated event received while the fetch
// manager was already processing an event. This serves as a flag to immediately
// retrigger a new fetch cycle upon completion of the current one.
attention *msgbus.InstanceStatusUpdated

// fetching is true when the resource files fetch and ingest routine is
// running
fetching bool
}

// cmdOrchestrate can be used from post action go routines
cmdOrchestrate struct {
state instance.MonitorState
Expand All @@ -201,7 +186,7 @@ type (
}

cmdFetchDone struct {
Files resource.Files
Files ridFiles
}

Factory struct {
Expand Down Expand Up @@ -256,9 +241,7 @@ func start(parent context.Context, qs pubsub.QueueSizer, p naming.Path, nodes []
cmdC: make(chan any),
databus: databus,
publisher: pubsub.PubFromContext(ctx),
files: filesManager{
fetched: make(map[string]resource.File),
},
files: newFilesManager(),
instStatus: make(map[string]instance.Status),
instMonitor: make(map[string]instance.Monitor),
nodeMonitor: make(map[string]node.Monitor),
Expand Down Expand Up @@ -385,6 +368,7 @@ func (t *Manager) worker(initialNodes []string) {

t.initRelationAvailStatus()
t.initResourceMonitor()
t.initLocalResourceFiles()
t.updateIsLeader()
t.updateIfChange()

Expand Down
Loading