diff --git a/core/channel.go b/core/channel.go index 4347bd1..1cbbada 100644 --- a/core/channel.go +++ b/core/channel.go @@ -123,31 +123,27 @@ func (c *channelImpl) Read() (buffer.ByteBuffer, error) { initSz := uint64(1024) for { - //之前未读完数据的切片 - head := c.inCache.GetRP() sz := c.inCache.AvailableReadSum() - bytes := c.inCache.FastMoveOut()[head : head+sz] - - //申请新的 buffer buf := c.alloc.Alloc(initSz) - //把之前数据的写入新的 buffer - if sz != 0 { - err = buf.Write(bytes) - if err != nil { - return c.alloc.Alloc(0), nil - } - } - //再将连接读到的数据写入新的 buffer - bytes = buf.FastMoveOut() - n, err = unix.Read(c.fd, bytes[sz:]) + n, err = unix.Read(c.fd, buf.FastMoveOut()[sz:]) if n == 0 || err != nil { + buf.Release() if err == unix.EAGAIN { return c.inCache, nil } return c.alloc.Alloc(0), errChannelClose } + head := c.inCache.GetRP() + bytes := c.inCache.FastMoveOut()[head : head+sz] + if sz != 0 { + err = buf.Write(bytes) + if err != nil { + return c.alloc.Alloc(0), nil + } + } //there is never index out of bound _ = buf.ShiftWN(uint64(n)) + c.inCache.Release() c.inCache = buf initSz <<= 1 } diff --git a/core/loop_group.go b/core/loop_group.go deleted file mode 100644 index c1a89dd..0000000 --- a/core/loop_group.go +++ /dev/null @@ -1,30 +0,0 @@ -package core - -type subLoopGroup struct { - loops []Loop - index int -} - -func (g *subLoopGroup) registe(lp Loop) { - g.loops = append(g.loops, lp) -} - -//for put new accept connection in subLoop load balance -func (g *subLoopGroup) next() Loop { - g.index++ - size := len(g.loops) - if g.index == size { - g.index -= size - //index 0 is mainLoop , so we should start with index 1 - g.index++ - } - return g.loops[g.index] -} - -func (g *subLoopGroup) iterate(f func(Loop) bool) { - for _, lp := range g.loops { - if !f(lp) { - break - } - } -} diff --git a/core/main_loop.go b/core/main_loop.go deleted file mode 100644 index decb579..0000000 --- a/core/main_loop.go +++ /dev/null @@ -1,32 +0,0 @@ -package core - -import "gunplan.top/concurrentNet/core/netpoll" - -type mainLoop struct { - poller *netpoll.Poller - channels map[int]ParentChannel -} - -func NewMainLoop() (*mainLoop, error) { - poller, err := netpoll.NewPoller() - if err != nil { - return nil, err - } - slp := &mainLoop{ - poller: poller, - channels: make(map[int]ParentChannel), - } - return slp, nil -} - -func (mp *mainLoop) start() { - -} - -func (mp *mainLoop) stop() { - -} - -func (mp *mainLoop) eventHandler() { - -} diff --git a/core/server.go b/core/server.go index ffce6ad..5869759 100644 --- a/core/server.go +++ b/core/server.go @@ -146,36 +146,4 @@ func (s *ServerImpl) startLoops() (err error) { return } return nil -} - -func (s *ServerImpl) startLoops() error { - var lps []Loop - cpuNum := runtime.NumCPU() - for i := 0; i < cpuNum; i++ { - slp, err := NewSubLoop() - if err != nil { - return err - } - lps = append(lps, slp) - } - mlp, err := NewMainLoop() - if err != nil { - return err - } - lps = append(lps, mlp) - - //To make mlp at the first of the loopGroup , when use iterate close loops , will close mlp first - for i := len(lps) - 1; i >= 0; i-- { - s.lg.registe(lps[i]) - } - - s.lg.iterate(func(lp Loop) bool { - s.wg.Add(1) - go func() { - lp.start() - s.wg.Done() - }() - return true - }) - return nil -} +} \ No newline at end of file diff --git a/core/sub_loop.go b/core/sub_loop.go deleted file mode 100644 index 1a4d221..0000000 --- a/core/sub_loop.go +++ /dev/null @@ -1,34 +0,0 @@ -package core - -import ( - "gunplan.top/concurrentNet/core/netpoll" -) - -type subLoop struct { - poller *netpoll.Poller - channels map[int]ChildChannel -} - -func NewSubLoop() (*subLoop, error) { - poller, err := netpoll.NewPoller() - if err != nil { - return nil, err - } - slp := &subLoop{ - poller: poller, - channels: make(map[int]ChildChannel), - } - return slp, nil -} - -func (slp *subLoop) start() { - -} - -func (slp *subLoop) stop() { - -} - -func (slp *subLoop) eventHandler() { - -}