From 8ac9d9fe5cc82c4493c71b01f4d223fff157712a Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Wed, 24 Jan 2018 14:20:34 +0000 Subject: [PATCH] Feed the stdout and stderr of the Pilot subprocess into glog Fixes: #166 --- pkg/pilot/cassandra/v3/pilot.go | 3 -- pkg/pilot/elasticsearch/v5/process.go | 2 - .../genericpilot/processmanager/process.go | 50 ++++++++++++++++++- 3 files changed, 49 insertions(+), 6 deletions(-) diff --git a/pkg/pilot/cassandra/v3/pilot.go b/pkg/pilot/cassandra/v3/pilot.go index 9656a5632..278d81932 100644 --- a/pkg/pilot/cassandra/v3/pilot.go +++ b/pkg/pilot/cassandra/v3/pilot.go @@ -2,7 +2,6 @@ package v3 import ( "fmt" - "os" "os/exec" "k8s.io/client-go/tools/cache" @@ -56,8 +55,6 @@ func (p *Pilot) CmdFunc(pilot *v1alpha1.Pilot) (*exec.Cmd, error) { // The /run.sh script is unique to gcr.io/google-samples/cassandra:v12. // TODO: Add support for other Cassandra images with different entry points. cmd := exec.Command("/run.sh") - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr return cmd, nil } diff --git a/pkg/pilot/elasticsearch/v5/process.go b/pkg/pilot/elasticsearch/v5/process.go index b37548b33..acc249b0a 100644 --- a/pkg/pilot/elasticsearch/v5/process.go +++ b/pkg/pilot/elasticsearch/v5/process.go @@ -14,8 +14,6 @@ func (p *Pilot) CmdFunc(pilot *v1alpha1.Pilot) (*exec.Cmd, error) { } cmd := exec.Command("elasticsearch") - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr cmd.Env = p.env().Strings() return cmd, nil diff --git a/pkg/pilot/genericpilot/processmanager/process.go b/pkg/pilot/genericpilot/processmanager/process.go index 7419e2f7e..746f10785 100644 --- a/pkg/pilot/genericpilot/processmanager/process.go +++ b/pkg/pilot/genericpilot/processmanager/process.go @@ -1,9 +1,13 @@ package processmanager import ( + "bufio" "fmt" "os" "os/exec" + "sync" + + "github.com/golang/glog" ) type Interface interface { @@ -41,13 +45,56 @@ type adapter struct { doneCh chan struct{} doneErr error + wg sync.WaitGroup } var _ Interface = &adapter{} +func (p *adapter) startCommandOutputLoggers() error { + stdout, err := p.cmd.StdoutPipe() + if err != nil { + return err + } + p.wg.Add(1) + go func() { + defer p.wg.Done() + in := bufio.NewScanner(stdout) + for in.Scan() { + glog.Infoln(in.Text()) + } + err := in.Err() + if err != nil { + glog.Error(err) + } + }() + + stderr, err := p.cmd.StderrPipe() + if err != nil { + return err + } + p.wg.Add(1) + go func() { + defer p.wg.Done() + in := bufio.NewScanner(stderr) + for in.Scan() { + glog.Errorln(in.Text()) + } + err := in.Err() + if err != nil { + glog.Error(err) + } + }() + return nil +} + // Start will start the underlying subprocess func (p *adapter) Start() error { - if err := p.cmd.Start(); err != nil { + err := p.startCommandOutputLoggers() + if err != nil { + return err + } + + if err = p.cmd.Start(); err != nil { return fmt.Errorf("error starting process: %s", err.Error()) } go p.startWait() @@ -71,6 +118,7 @@ func (p *adapter) Stop() error { // If the subprocess has not been started yet, the returned chan will // not close until the subprocess has been started and then stopped. func (p *adapter) Wait() <-chan struct{} { + defer p.wg.Wait() return p.doneCh }