Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ func (m *ResourceUsageMonitor) Run(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

log.Errorf("XYZ Resource Usage Monitor Run()")
defer log.Errorf("XYZ Resource Usage Monitor Run() exit")
// 1s, 2s, 4s, 8s, 16s, ..., m.maxSamplePeriod, m.maxSamplePeriod, m.maxSamplePeriod, ...
backoffPeriod := min(time.Second, m.maxSamplePeriod)
backoff := time.NewTimer(backoffPeriod)
Expand All @@ -297,6 +299,7 @@ func (m *ResourceUsageMonitor) Run(ctx context.Context) {
for {
select {
case <-ctx.Done():
log.Errorf("XYZ Resource Usage Monitor ctx done: %v (cause %v)", ctx.Err(), context.Cause(ctx))
return
case <-backoff.C:
backoffPeriod = min(backoffPeriod*2, m.maxSamplePeriod)
Expand All @@ -323,6 +326,7 @@ func (m *ResourceUsageMonitor) Run(ctx context.Context) {

err = AggregateResourceUsage(prevRU, currRU, ss, &aggRU, false)
if err != nil {
log.Errorf("XYZ Resource Usage Monitor: agg error: %v", err)
m.errorf("aggregation error: %v", err)
resetSamples()
continue
Expand All @@ -333,8 +337,10 @@ func (m *ResourceUsageMonitor) Run(ctx context.Context) {

// Sample size reached.
if numSamplesCollected == ss {
log.Errorf("XYZ Resource Usage Monitor: %v samples collected!", numSamplesCollected)
debugStatus, err := m.ruf.DebugStatusForPID(m.pid)
if err != nil {
log.Errorf("XYZ Resource Usage Monitor: fail to get debug status")
m.errorf("failed to get debug status for process[%d]: %v", m.pid, err)
}
rud := &mpb.ResourceUsageData{
Expand Down Expand Up @@ -393,13 +399,17 @@ func (m *ResourceUsageMonitor) errorf(format string, a ...any) {
}

// SendProtoToServer wraps a proto in a fspb.Message and sends it to the server.
func SendProtoToServer(ctx context.Context, pb proto.Message, msgType string, sc service.Context) error {
func SendProtoToServer(ctx context.Context, pb proto.Message, msgType string, sc service.Context) (sErr error) {
d, err := anypb.New(pb)
if err != nil {
return err
}
log.Errorf("XYZ SendProtoToServer()")
defer func() { log.Errorf("XYZ SendProtoToServer(): exit: %v", sErr) }()

ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

return sc.Send(ctx, service.AckMessage{
M: &fspb.Message{
Destination: &fspb.Address{ServiceName: "system"},
Expand Down
8 changes: 7 additions & 1 deletion fleetspeak/src/client/system_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ type fakeServiceContext struct {
}

func (sc fakeServiceContext) Send(ctx context.Context, m service.AckMessage) error {
log.Errorf("XYZ fake service context: Send(m) of %+v", m.M)
select {
case <-ctx.Done():
log.Errorf("XYZ fake service context: Send(m) context canceled: %v cause: %v", ctx.Err(), context.Cause(ctx))
return ctx.Err()
case sc.out <- m.M:
log.Errorf("XYZ fake service context: Send(m) OK")
return nil
}
}
Expand Down Expand Up @@ -98,7 +101,9 @@ func setUpTestEnv(t *testing.T) *testEnv {
wg.Add(1)
go func() {
defer wg.Done()
for range out {
log.Errorf("XYZ srv.Stop draining out chan")
for m := range out {
log.Errorf("XYZ Discarded message after stop: %+v", m)
}
}()

Expand Down Expand Up @@ -175,6 +180,7 @@ func TestStatsMsg(t *testing.T) {
for {
select {
case res := <-env.ctx.out:
log.Errorf("XYZ test res := <-env.ctx.out with res.MessageType=%q", res.MessageType)
if res.MessageType != "ResourceUsage" {
t.Errorf("Received message of unexpected type '%s'", res.MessageType)
continue
Expand Down