diff --git a/connwrap.go b/connwrap.go index 9a0bdf4..0c92626 100644 --- a/connwrap.go +++ b/connwrap.go @@ -14,6 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build !windows // +build !windows package grpcfd @@ -21,9 +22,11 @@ package grpcfd import ( "context" "fmt" + "log" "net" "os" "runtime" + "runtime/debug" "syscall" "github.com/edwarnicke/serialize" @@ -170,7 +173,11 @@ func (w *connWrap) Write(b []byte) (int, error) { } func (w *connWrap) SendFD(fd uintptr) <-chan error { - errCh := make(chan error, 1) + log.Default().Println("grpcfd: SendFD start: " + fmt.Sprint(goid())) + debug.PrintStack() + defer log.Default().Println("grpcfd: SendFD end: " + fmt.Sprint(goid())) + + errCh := make(chan error, 10) // Dup the fd because we have no way of knowing what the caller will do with it between // now and when we can send it fd, _, err := syscall.Syscall(syscall.SYS_FCNTL, fd, uintptr(syscall.F_DUPFD), 0) @@ -187,7 +194,11 @@ func (w *connWrap) SendFD(fd uintptr) <-chan error { } func (w *connWrap) SendFile(file SyscallConn) <-chan error { - errCh := make(chan error, 1) + log.Default().Println("grpcfd: SendFile start: " + fmt.Sprint(goid())) + debug.PrintStack() + defer log.Default().Println("grpcfd: SendFile end: " + fmt.Sprint(goid())) + + errCh := make(chan error, 10) raw, err := file.SyscallConn() if err != nil { errCh <- errors.Wrapf(err, "unable to retrieve syscall.RawConn for src %+v", file) @@ -202,16 +213,17 @@ func (w *connWrap) SendFile(file SyscallConn) <-chan error { close(errCh) return } - go func(errChIn <-chan error, errChOut chan<- error) { - for err := range errChIn { - errChOut <- err - } - close(errChOut) - }(w.SendFD(fd), errCh) + go joinErrChs(w.SendFD(fd), errCh) }) + if err != nil { - errCh <- err - close(errCh) + // Return a separate channel to not conflict with goroutine from the raw.Control + // As an alternative, mutex can be used, but it can affect performance. + // + // In some cases, errCh can't be closed, but it's fine. https://groups.google.com/g/golang-nuts/c/pZwdYRGxCIk/m/qpbHxRRPJdUJ + var resCh = make(chan error, 1) + resCh <- err + return resCh } return errCh } @@ -229,7 +241,7 @@ func (w *connWrap) String() string { } func (w *connWrap) RecvFD(dev, ino uint64) <-chan uintptr { - fdCh := make(chan uintptr, 1) + fdCh := make(chan uintptr, 10) w.recvExecutor.AsyncExec(func() { key := inodeKey{ dev: dev, @@ -266,7 +278,7 @@ func (w *connWrap) RecvFDByURL(urlStr string) (<-chan uintptr, error) { } func (w *connWrap) RecvFile(dev, ino uint64) <-chan *os.File { - fileCh := make(chan *os.File, 1) + fileCh := make(chan *os.File, 10) go func(fdCh <-chan uintptr, fileCh chan<- *os.File) { for fd := range fdCh { if runtime.GOOS == "linux" { @@ -352,14 +364,16 @@ func (w *connWrap) Read(b []byte) (n int, err error) { } // FromPeer - return grpcfd.FDTransceiver from peer.Peer -// ok is true of successful, false otherwise +// +// ok is true of successful, false otherwise func FromPeer(p *peer.Peer) (transceiver FDTransceiver, ok bool) { transceiver, ok = p.Addr.(FDTransceiver) return transceiver, ok } // FromContext - return grpcfd.FDTransceiver from context.Context -// ok is true of successful, false otherwise +// +// ok is true of successful, false otherwise func FromContext(ctx context.Context) (transceiver FDTransceiver, ok bool) { p, ok := peer.FromContext(ctx) if !ok { diff --git a/connwrap_linux.go b/connwrap_linux.go index 5a1f7a8..00712d7 100644 --- a/connwrap_linux.go +++ b/connwrap_linux.go @@ -14,18 +14,24 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build linux // +build linux package grpcfd import ( + "fmt" + "log" "os" "golang.org/x/sys/unix" ) func (w *connWrap) SendFilename(filename string) <-chan error { - errCh := make(chan error, 1) + log.Default().Println("grpcfd: SendFilename start" + fmt.Sprint(goid())) + defer log.Default().Println("grpcfd: SendFilename end" + fmt.Sprint(goid())) + + errCh := make(chan error, 10) file, err := os.OpenFile(filename, unix.O_PATH, 0) // #nosec if err != nil { errCh <- err @@ -33,15 +39,10 @@ func (w *connWrap) SendFilename(filename string) <-chan error { return errCh } go func(errChIn <-chan error, errChOut chan<- error) { - for err := range errChIn { - errChOut <- err - } - err := file.Close() - if err != nil { - errChOut <- err - } - close(errChOut) + log.Default().Println("grpcfd: SendFilename goroutine start: " + fmt.Sprint(goid())) + defer log.Default().Println("grpcfd: SendFilename end: " + fmt.Sprint(goid())) + joinErrChs(errChIn, errChOut) + _ = file.Close() }(w.SendFile(file), errCh) - _ = file.Close() return errCh } diff --git a/connwrap_notlinux.go b/connwrap_notlinux.go index 613ae1b..f45ac84 100644 --- a/connwrap_notlinux.go +++ b/connwrap_notlinux.go @@ -14,6 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build !linux && !windows // +build !linux,!windows package grpcfd @@ -23,7 +24,7 @@ import ( ) func (w *connWrap) SendFilename(filename string) <-chan error { - errCh := make(chan error, 1) + errCh := make(chan error, 10) // Note: this will fail in most cases for 'unopenable' files (like unix file sockets). See use of O_PATH in connwrap_linux.go for // the trick that makes this work in Linux file, err := os.Open(filename) // #nosec @@ -33,15 +34,8 @@ func (w *connWrap) SendFilename(filename string) <-chan error { return errCh } go func(errChIn <-chan error, errChOut chan<- error) { - for err := range errChIn { - errChOut <- err - } - err := file.Close() - if err != nil { - errChOut <- err - } - close(errChOut) + joinErrChs(errChIn, errChOut) + _ = file.Close() }(w.SendFile(file), errCh) - _ = file.Close() return errCh } diff --git a/doc.go b/doc.go index 48daf9c..bfeab1c 100644 --- a/doc.go +++ b/doc.go @@ -17,3 +17,21 @@ // Package grpcfd provides a TransportCredential that can wrap other TransportCredentials and cause the // peer.Addr to be a FDSender or FDRecver such that it can send or receive files over unix file sockets (if available). package grpcfd + +import ( + "fmt" + "runtime" + "strconv" + "strings" +) + +func goid() int { + var buf [64]byte + n := runtime.Stack(buf[:], false) + idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0] + id, err := strconv.Atoi(idField) + if err != nil { + panic(fmt.Sprintf("cannot get goroutine id: %v", err)) + } + return id +} diff --git a/go.mod b/go.mod index 59abe4b..d447a88 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/edwarnicke/grpcfd +module github.com/denis-tingaikin/grpcfd go 1.14 diff --git a/per_rpc_transport_credentials.go b/per_rpc_transport_credentials.go index da4bec0..20f0679 100644 --- a/per_rpc_transport_credentials.go +++ b/per_rpc_transport_credentials.go @@ -14,12 +14,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build !windows // +build !windows package grpcfd import ( context "context" + "fmt" + "log" "os" "github.com/edwarnicke/serialize" @@ -61,7 +64,7 @@ func (w *wrapPerRPCCredentials) RequireTransportSecurity() bool { } func (w *wrapPerRPCCredentials) SendFD(fd uintptr) <-chan error { - out := make(chan error, 1) + out := make(chan error, 10) w.executor.AsyncExec(func() { if w.FDTransceiver != nil { go joinErrChs(w.FDTransceiver.SendFD(fd), out) @@ -75,7 +78,7 @@ func (w *wrapPerRPCCredentials) SendFD(fd uintptr) <-chan error { } func (w *wrapPerRPCCredentials) SendFile(file SyscallConn) <-chan error { - out := make(chan error, 1) + out := make(chan error, 10) w.executor.AsyncExec(func() { if w.FDTransceiver != nil { go joinErrChs(w.FDTransceiver.SendFile(file), out) @@ -89,7 +92,7 @@ func (w *wrapPerRPCCredentials) SendFile(file SyscallConn) <-chan error { } func (w *wrapPerRPCCredentials) RecvFD(dev, inode uint64) <-chan uintptr { - out := make(chan uintptr, 1) + out := make(chan uintptr, 10) w.executor.AsyncExec(func() { if w.FDTransceiver != nil { go joinFDChs(w.FDTransceiver.RecvFD(dev, inode), out) @@ -103,7 +106,7 @@ func (w *wrapPerRPCCredentials) RecvFD(dev, inode uint64) <-chan uintptr { } func (w *wrapPerRPCCredentials) RecvFile(dev, ino uint64) <-chan *os.File { - out := make(chan *os.File, 1) + out := make(chan *os.File, 10) w.executor.AsyncExec(func() { if w.FDTransceiver != nil { go joinFileChs(w.FDTransceiver.RecvFile(dev, ino), out) @@ -121,7 +124,7 @@ func (w *wrapPerRPCCredentials) RecvFileByURL(urlStr string) (<-chan *os.File, e if err != nil { return nil, err } - out := make(chan *os.File, 1) + out := make(chan *os.File, 10) w.executor.AsyncExec(func() { if w.FDTransceiver != nil { go joinFileChs(w.FDTransceiver.RecvFile(dev, ino), out) @@ -139,7 +142,7 @@ func (w *wrapPerRPCCredentials) RecvFDByURL(urlStr string) (<-chan uintptr, erro if err != nil { return nil, err } - out := make(chan uintptr, 1) + out := make(chan uintptr, 10) w.executor.AsyncExec(func() { if w.FDTransceiver != nil { go joinFDChs(w.FDTransceiver.RecvFD(dev, ino), out) @@ -153,6 +156,8 @@ func (w *wrapPerRPCCredentials) RecvFDByURL(urlStr string) (<-chan uintptr, erro } func joinErrChs(in <-chan error, out chan<- error) { + log.Default().Println("grpcfd: joinErrChs start: " + fmt.Sprint(goid())) + defer log.Default().Println("grpcfd: joinErrChs end: " + fmt.Sprint(goid())) for err := range in { out <- err } @@ -160,6 +165,8 @@ func joinErrChs(in <-chan error, out chan<- error) { } func joinFileChs(in <-chan *os.File, out chan<- *os.File) { + log.Default().Println("grpcfd: joinFileChs start: " + fmt.Sprint(goid())) + defer log.Default().Println("grpcfd: joinFileChs end: " + fmt.Sprint(goid())) for file := range in { out <- file } @@ -167,6 +174,9 @@ func joinFileChs(in <-chan *os.File, out chan<- *os.File) { } func joinFDChs(in <-chan uintptr, out chan<- uintptr) { + log.Default().Println("grpcfd: joinFDChs start: " + fmt.Sprint(goid())) + defer log.Default().Println("grpcfd: joinFDChs end: " + fmt.Sprint(goid())) + for fd := range in { out <- fd } @@ -195,7 +205,8 @@ func PerRPCCredentialsFromCallOptions(opts ...grpc.CallOption) credentials.PerRP } // FromPerRPCCredentials - return grpcfd.FDTransceiver from credentials.PerRPCCredentials -// ok is true of successful, false otherwise +// +// ok is true of successful, false otherwise func FromPerRPCCredentials(rpcCredentials credentials.PerRPCCredentials) (transceiver FDTransceiver, ok bool) { if transceiver, ok = rpcCredentials.(FDTransceiver); ok { return transceiver, true diff --git a/per_rpc_transport_credentials_linux.go b/per_rpc_transport_credentials_linux.go index fca1abe..5b76738 100644 --- a/per_rpc_transport_credentials_linux.go +++ b/per_rpc_transport_credentials_linux.go @@ -14,6 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build linux // +build linux package grpcfd @@ -25,7 +26,7 @@ import ( ) func (w *wrapPerRPCCredentials) SendFilename(filename string) <-chan error { - out := make(chan error, 1) + out := make(chan error, 10) file, err := os.OpenFile(filename, unix.O_PATH, 0) // #nosec if err != nil { out <- err diff --git a/per_rpc_transport_credentials_notlinux.go b/per_rpc_transport_credentials_notlinux.go index e2de30e..6730070 100644 --- a/per_rpc_transport_credentials_notlinux.go +++ b/per_rpc_transport_credentials_notlinux.go @@ -14,6 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build !linux && !windows // +build !linux,!windows package grpcfd @@ -21,7 +22,7 @@ package grpcfd import "os" func (w *wrapPerRPCCredentials) SendFilename(filename string) <-chan error { - out := make(chan error, 1) + out := make(chan error, 10) // Note: this will fail in most cases for 'unopenable' files (like unix file sockets). See use of O_PATH in per_rpc_transport_credentials_linux.go for // the trick that makes this work in Linux file, err := os.Open(filename) // #nosec