From c4f32ef3bc3fd5f1fad1beee382ab57bb75e5b1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E9=94=85=E9=A5=AD?= <1156544355@qq.com> Date: Fri, 28 Nov 2025 00:39:06 +0800 Subject: [PATCH 1/6] =?UTF-8?q?=E2=9C=A8=E6=94=B9=E6=88=90=E9=9D=9E?= =?UTF-8?q?=E5=A0=B5=E5=A1=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- event_channel.go | 65 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 54 insertions(+), 11 deletions(-) diff --git a/event_channel.go b/event_channel.go index 64d64602..75f4507e 100644 --- a/event_channel.go +++ b/event_channel.go @@ -1,5 +1,7 @@ package zero +import "time" + // FutureEvent 是 ZeroBot 交互式的核心,用于异步获取指定事件 type FutureEvent struct { Type string @@ -34,7 +36,8 @@ func (m *Matcher) FutureEvent(typ string, rule ...Rule) *FutureEvent { // // 该 chan 必须接收,如需手动取消监听,请使用 Repeat 方法 func (n *FutureEvent) Next() <-chan *Ctx { - ch := make(chan *Ctx, 1) + // 稍微增加一点缓冲,防止极端情况死锁 + ch := make(chan *Ctx, 5) StoreTempMatcher(&Matcher{ Type: Type(n.Type), Block: n.Block, @@ -42,7 +45,13 @@ func (n *FutureEvent) Next() <-chan *Ctx { Rules: n.Rule, Engine: defaultEngine, Handler: func(ctx *Ctx) { - ch <- ctx + // 使用 select 防止阻塞,虽然 Next 只取一次,但非阻塞是好习惯 + select { + case ch <- ctx: + default: + } + // Next 是一次性的,发送完最好关闭,但由 StoreTempMatcher 机制决定 + // 这里不手动关闭 ch,避免多次触发 panic,让调用方接收 close(ch) }, }) @@ -53,10 +62,16 @@ func (n *FutureEvent) Next() <-chan *Ctx { // // 如果没有取消监听,将不断监听指定事件 func (n *FutureEvent) Repeat() (recv <-chan *Ctx, cancel func()) { - ch, done := make(chan *Ctx, 1), make(chan struct{}) + // 【核心修改1】大幅增加缓冲区 + // 100 的缓冲区足够应对你在处理图片(1~3秒)期间群里的消息爆发 + ch := make(chan *Ctx, 100) + done := make(chan struct{}) + go func() { defer close(ch) - in := make(chan *Ctx, 1) + // 内部通道也给足缓冲 + in := make(chan *Ctx, 100) + matcher := StoreMatcher(&Matcher{ Type: Type(n.Type), Block: n.Block, @@ -64,22 +79,44 @@ func (n *FutureEvent) Repeat() (recv <-chan *Ctx, cancel func()) { Rules: n.Rule, Engine: defaultEngine, Handler: func(ctx *Ctx) { - in <- ctx + // 【核心修改2】非阻塞写入 + // 如果 consumer 处理太慢导致 in 满了,这里会走 default 分支 + // 从而“丢弃”消息,而不是“卡死”整个 Bot 引擎 + select { + case in <- ctx: + default: + // 缓冲区已满,丢弃该消息以保护主进程 + } }, }) + + // 确保退出时清理 Matcher + defer matcher.Delete() + for { select { case e := <-in: - ch <- e + // 将消息转发给用户通道 + select { + case ch <- e: + case <-done: + // 收到外部取消信号,退出 + return + } case <-done: - matcher.Delete() - close(in) + // 收到外部取消信号,退出 return } } }() + return ch, func() { - close(done) + // 防止多次调用 cancel 导致 panic + select { + case <-done: + default: + close(done) + } } } @@ -91,10 +128,16 @@ func (n *FutureEvent) Take(num int) <-chan *Ctx { ch := make(chan *Ctx, num) go func() { defer close(ch) + defer cancel() // 确保任务完成后取消监听 for i := 0; i < num; i++ { - ch <- <-recv + select { + case e := <-recv: + ch <- e + case <-time.After(time.Minute * 10): + // 加上一个超长超时防止彻底泄露(可选优化) + return + } } - cancel() }() return ch } From 54ccde3ed8f734146a26bf9173930e219b9e3609 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E9=94=85=E9=A5=AD?= <1156544355@qq.com> Date: Fri, 28 Nov 2025 17:45:20 +0800 Subject: [PATCH 2/6] =?UTF-8?q?=E2=9C=A8=E4=BB=85=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=E5=AE=B9=E9=87=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- event_channel.go | 105 +++++++++++++++-------------------------------- 1 file changed, 32 insertions(+), 73 deletions(-) diff --git a/event_channel.go b/event_channel.go index 75f4507e..433cc8a9 100644 --- a/event_channel.go +++ b/event_channel.go @@ -1,7 +1,5 @@ package zero -import "time" - // FutureEvent 是 ZeroBot 交互式的核心,用于异步获取指定事件 type FutureEvent struct { Type string @@ -36,8 +34,7 @@ func (m *Matcher) FutureEvent(typ string, rule ...Rule) *FutureEvent { // // 该 chan 必须接收,如需手动取消监听,请使用 Repeat 方法 func (n *FutureEvent) Next() <-chan *Ctx { - // 稍微增加一点缓冲,防止极端情况死锁 - ch := make(chan *Ctx, 5) + ch := make(chan *Ctx, 1) StoreTempMatcher(&Matcher{ Type: Type(n.Type), Block: n.Block, @@ -45,14 +42,12 @@ func (n *FutureEvent) Next() <-chan *Ctx { Rules: n.Rule, Engine: defaultEngine, Handler: func(ctx *Ctx) { - // 使用 select 防止阻塞,虽然 Next 只取一次,但非阻塞是好习惯 - select { - case ch <- ctx: - default: - } - // Next 是一次性的,发送完最好关闭,但由 StoreTempMatcher 机制决定 - // 这里不手动关闭 ch,避免多次触发 panic,让调用方接收 - close(ch) + // 使用 go func 异步发送,确保不阻塞主线程 + go func() { + defer func() { recover() }() + ch <- ctx + close(ch) + }() }, }) return ch @@ -62,61 +57,31 @@ func (n *FutureEvent) Next() <-chan *Ctx { // // 如果没有取消监听,将不断监听指定事件 func (n *FutureEvent) Repeat() (recv <-chan *Ctx, cancel func()) { - // 【核心修改1】大幅增加缓冲区 - // 100 的缓冲区足够应对你在处理图片(1~3秒)期间群里的消息爆发 + // 【修改点1】保留扩容到 100,应对突发消息 ch := make(chan *Ctx, 100) - done := make(chan struct{}) - - go func() { - defer close(ch) - // 内部通道也给足缓冲 - in := make(chan *Ctx, 100) - - matcher := StoreMatcher(&Matcher{ - Type: Type(n.Type), - Block: n.Block, - Priority: n.Priority, - Rules: n.Rule, - Engine: defaultEngine, - Handler: func(ctx *Ctx) { - // 【核心修改2】非阻塞写入 - // 如果 consumer 处理太慢导致 in 满了,这里会走 default 分支 - // 从而“丢弃”消息,而不是“卡死”整个 Bot 引擎 - select { - case in <- ctx: - default: - // 缓冲区已满,丢弃该消息以保护主进程 - } - }, - }) - - // 确保退出时清理 Matcher - defer matcher.Delete() - - for { - select { - case e := <-in: - // 将消息转发给用户通道 - select { - case ch <- e: - case <-done: - // 收到外部取消信号,退出 - return - } - case <-done: - // 收到外部取消信号,退出 - return - } - } - }() + matcher := StoreMatcher(&Matcher{ + Type: Type(n.Type), + Block: n.Block, + Priority: n.Priority, + Rules: n.Rule, + Engine: defaultEngine, + Handler: func(ctx *Ctx) { + // 【修改点2】使用 go func 异步发送 + // 只要业务层(Consumer)处理速度基本正常,这就不会阻塞 Bot 核心 + // 即使业务层处理慢,消息也会先堆积在 goroutine 中,而不会卡死主线程 + go func() { + // 防止 ch 被 close 后写入导致 panic + defer func() { recover() }() + ch <- ctx + }() + }, + }) return ch, func() { - // 防止多次调用 cancel 导致 panic - select { - case <-done: - default: - close(done) - } + matcher.Delete() + // 防止多次调用 cancel 导致关闭已关闭通道的 panic + defer func() { recover() }() + close(ch) } } @@ -128,16 +93,10 @@ func (n *FutureEvent) Take(num int) <-chan *Ctx { ch := make(chan *Ctx, num) go func() { defer close(ch) - defer cancel() // 确保任务完成后取消监听 for i := 0; i < num; i++ { - select { - case e := <-recv: - ch <- e - case <-time.After(time.Minute * 10): - // 加上一个超长超时防止彻底泄露(可选优化) - return - } + ch <- <-recv } + cancel() }() return ch -} +} \ No newline at end of file From 854add1aab66f9c200ff04212416cc1e2cae0e64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E9=94=85=E9=A5=AD?= <1156544355@qq.com> Date: Fri, 28 Nov 2025 17:45:20 +0800 Subject: [PATCH 3/6] =?UTF-8?q?=E2=9C=8F=EF=B8=8F=E4=BF=AElint?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- event_channel.go | 105 +++++++++++++++-------------------------------- 1 file changed, 32 insertions(+), 73 deletions(-) diff --git a/event_channel.go b/event_channel.go index 75f4507e..1958bc2d 100644 --- a/event_channel.go +++ b/event_channel.go @@ -1,7 +1,5 @@ package zero -import "time" - // FutureEvent 是 ZeroBot 交互式的核心,用于异步获取指定事件 type FutureEvent struct { Type string @@ -36,8 +34,7 @@ func (m *Matcher) FutureEvent(typ string, rule ...Rule) *FutureEvent { // // 该 chan 必须接收,如需手动取消监听,请使用 Repeat 方法 func (n *FutureEvent) Next() <-chan *Ctx { - // 稍微增加一点缓冲,防止极端情况死锁 - ch := make(chan *Ctx, 5) + ch := make(chan *Ctx, 1) StoreTempMatcher(&Matcher{ Type: Type(n.Type), Block: n.Block, @@ -45,14 +42,12 @@ func (n *FutureEvent) Next() <-chan *Ctx { Rules: n.Rule, Engine: defaultEngine, Handler: func(ctx *Ctx) { - // 使用 select 防止阻塞,虽然 Next 只取一次,但非阻塞是好习惯 - select { - case ch <- ctx: - default: - } - // Next 是一次性的,发送完最好关闭,但由 StoreTempMatcher 机制决定 - // 这里不手动关闭 ch,避免多次触发 panic,让调用方接收 - close(ch) + // 使用 go func 异步发送,确保不阻塞主线程 + go func() { + defer func() { _ = recover() }() + ch <- ctx + close(ch) + }() }, }) return ch @@ -62,61 +57,31 @@ func (n *FutureEvent) Next() <-chan *Ctx { // // 如果没有取消监听,将不断监听指定事件 func (n *FutureEvent) Repeat() (recv <-chan *Ctx, cancel func()) { - // 【核心修改1】大幅增加缓冲区 - // 100 的缓冲区足够应对你在处理图片(1~3秒)期间群里的消息爆发 + // 【修改点1】保留扩容到 100,应对突发消息 ch := make(chan *Ctx, 100) - done := make(chan struct{}) - - go func() { - defer close(ch) - // 内部通道也给足缓冲 - in := make(chan *Ctx, 100) - - matcher := StoreMatcher(&Matcher{ - Type: Type(n.Type), - Block: n.Block, - Priority: n.Priority, - Rules: n.Rule, - Engine: defaultEngine, - Handler: func(ctx *Ctx) { - // 【核心修改2】非阻塞写入 - // 如果 consumer 处理太慢导致 in 满了,这里会走 default 分支 - // 从而“丢弃”消息,而不是“卡死”整个 Bot 引擎 - select { - case in <- ctx: - default: - // 缓冲区已满,丢弃该消息以保护主进程 - } - }, - }) - - // 确保退出时清理 Matcher - defer matcher.Delete() - - for { - select { - case e := <-in: - // 将消息转发给用户通道 - select { - case ch <- e: - case <-done: - // 收到外部取消信号,退出 - return - } - case <-done: - // 收到外部取消信号,退出 - return - } - } - }() + matcher := StoreMatcher(&Matcher{ + Type: Type(n.Type), + Block: n.Block, + Priority: n.Priority, + Rules: n.Rule, + Engine: defaultEngine, + Handler: func(ctx *Ctx) { + // 【修改点2】使用 go func 异步发送 + // 只要业务层(Consumer)处理速度基本正常,这就不会阻塞 Bot 核心 + // 即使业务层处理慢,消息也会先堆积在 goroutine 中,而不会卡死主线程 + go func() { + // 防止 ch 被 close 后写入导致 panic + defer func() { _ = recover() }() + ch <- ctx + }() + }, + }) return ch, func() { - // 防止多次调用 cancel 导致 panic - select { - case <-done: - default: - close(done) - } + matcher.Delete() + // 防止多次调用 cancel 导致关闭已关闭通道的 panic + defer func() { _ = recover() }() + close(ch) } } @@ -128,16 +93,10 @@ func (n *FutureEvent) Take(num int) <-chan *Ctx { ch := make(chan *Ctx, num) go func() { defer close(ch) - defer cancel() // 确保任务完成后取消监听 for i := 0; i < num; i++ { - select { - case e := <-recv: - ch <- e - case <-time.After(time.Minute * 10): - // 加上一个超长超时防止彻底泄露(可选优化) - return - } + ch <- <-recv } + cancel() }() return ch -} +} \ No newline at end of file From 2e65f29da0d2a2a4212c025112d87bcbe36bb038 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E9=94=85=E9=A5=AD?= <1156544355@qq.com> Date: Fri, 28 Nov 2025 17:55:39 +0800 Subject: [PATCH 4/6] =?UTF-8?q?=E2=9C=8F=EF=B8=8F=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=E5=90=88=E5=B9=B6=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- event_channel.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/event_channel.go b/event_channel.go index 5c665234..1958bc2d 100644 --- a/event_channel.go +++ b/event_channel.go @@ -34,7 +34,6 @@ func (m *Matcher) FutureEvent(typ string, rule ...Rule) *FutureEvent { // // 该 chan 必须接收,如需手动取消监听,请使用 Repeat 方法 func (n *FutureEvent) Next() <-chan *Ctx { - ch := make(chan *Ctx, 1) ch := make(chan *Ctx, 1) StoreTempMatcher(&Matcher{ Type: Type(n.Type), @@ -58,7 +57,6 @@ func (n *FutureEvent) Next() <-chan *Ctx { // // 如果没有取消监听,将不断监听指定事件 func (n *FutureEvent) Repeat() (recv <-chan *Ctx, cancel func()) { - // 【修改点1】保留扩容到 100,应对突发消息 // 【修改点1】保留扩容到 100,应对突发消息 ch := make(chan *Ctx, 100) matcher := StoreMatcher(&Matcher{ @@ -98,9 +96,6 @@ func (n *FutureEvent) Take(num int) <-chan *Ctx { for i := 0; i < num; i++ { ch <- <-recv } - cancel() - ch <- <-recv - } cancel() }() return ch From 706e6edd0bc8b821ed72827fa1813b81ae09838e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E9=94=85=E9=A5=AD?= <1156544355@qq.com> Date: Fri, 28 Nov 2025 18:11:59 +0800 Subject: [PATCH 5/6] =?UTF-8?q?=E2=9C=A8=E4=BF=9D=E6=8C=81=E5=8E=9F?= =?UTF-8?q?=E6=9D=A5=E7=9A=84done=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- event_channel.go | 55 +++++++++++++++++++++++++++--------------------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/event_channel.go b/event_channel.go index 1958bc2d..d725d97d 100644 --- a/event_channel.go +++ b/event_channel.go @@ -57,31 +57,38 @@ func (n *FutureEvent) Next() <-chan *Ctx { // // 如果没有取消监听,将不断监听指定事件 func (n *FutureEvent) Repeat() (recv <-chan *Ctx, cancel func()) { - // 【修改点1】保留扩容到 100,应对突发消息 - ch := make(chan *Ctx, 100) - matcher := StoreMatcher(&Matcher{ - Type: Type(n.Type), - Block: n.Block, - Priority: n.Priority, - Rules: n.Rule, - Engine: defaultEngine, - Handler: func(ctx *Ctx) { - // 【修改点2】使用 go func 异步发送 - // 只要业务层(Consumer)处理速度基本正常,这就不会阻塞 Bot 核心 - // 即使业务层处理慢,消息也会先堆积在 goroutine 中,而不会卡死主线程 - go func() { - // 防止 ch 被 close 后写入导致 panic - defer func() { _ = recover() }() - ch <- ctx - }() - }, - }) - + // 保留扩容到 100,应对突发消息 + ch, done := make(chan *Ctx, 100), make(chan struct{}) + go func() { + defer close(ch) + in := make(chan *Ctx, 1) + matcher := StoreMatcher(&Matcher{ + Type: Type(n.Type), + Block: n.Block, + Priority: n.Priority, + Rules: n.Rule, + Engine: defaultEngine, + Handler: func(ctx *Ctx) { + // 只要 Consumer 处理不是极度滞后,这种方式就能防止 Bot 核心被阻塞 + go func() { + defer func() { _ = recover() }() + in <- ctx + }() + }, + }) + for { + select { + case e := <-in: + ch <- e + case <-done: + matcher.Delete() + close(in) + return + } + } + }() return ch, func() { - matcher.Delete() - // 防止多次调用 cancel 导致关闭已关闭通道的 panic - defer func() { _ = recover() }() - close(ch) + close(done) } } From ed2914b8592805cbab1a9412dcf90e48a087922b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E9=94=85=E9=A5=AD?= <1156544355@qq.com> Date: Fri, 28 Nov 2025 19:22:01 +0800 Subject: [PATCH 6/6] =?UTF-8?q?=E2=9C=A8ctrl=20s=E8=87=AA=E5=8A=A8?= =?UTF-8?q?=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- event_channel.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/event_channel.go b/event_channel.go index d725d97d..677ff297 100644 --- a/event_channel.go +++ b/event_channel.go @@ -106,4 +106,4 @@ func (n *FutureEvent) Take(num int) <-chan *Ctx { cancel() }() return ch -} \ No newline at end of file +}