-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwaitinline.go
More file actions
158 lines (141 loc) · 3.38 KB
/
waitinline.go
File metadata and controls
158 lines (141 loc) · 3.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Package waitinline provides a fair, FIFO-style locking mechanism
// where goroutines acquire access in the order they request it.
package waitinline
import (
"context"
"errors"
)
var (
// ErrQueueClosed is returned when the Line has been closed.
ErrQueueClosed = errors.New("queue closed")
// ErrContextCancelled is returned when the provided context is cancelled before lock acquisition.
ErrContextCancelled = errors.New("context cancelled")
// ErrFailedUnlock is returned when unlocking fails due to improper state.
ErrFailedUnlock = errors.New("unlock failed")
)
// LockHandler represents a ticket acquired from the line.
// It must be unlocked once done, to release the lock to the next goroutine.
type LockHandler interface {
Unlock() error
}
type pass struct {
signal chan struct{}
ctx context.Context
cancel context.CancelFunc
queuectx context.Context
}
// Line represents a fair lock where goroutines wait in order (like a queue) to acquire access.
type Line struct {
queue chan *pass
ctx context.Context
cancel context.CancelFunc
done chan struct{}
}
// New returns a new Line with the specified capacity.
// Capacity determines how many tickets can wait at once.
func New(size int) *Line {
ctx, cancel := context.WithCancel(context.Background())
fl := &Line{
queue: make(chan *pass, size),
done: make(chan struct{}),
ctx: ctx,
cancel: cancel,
}
go fl.serve()
return fl
}
// serve runs the internal dispatcher which grants passes in FIFO order.
func (l *Line) serve() {
defer close(l.done)
for {
select {
case <-l.ctx.Done():
for {
select {
case t := <-l.queue:
t.cancel()
default:
return
}
}
default:
}
select {
case ticket := <-l.queue:
select {
case <-l.ctx.Done():
ticket.cancel()
continue
case <-ticket.ctx.Done():
continue
default:
}
select {
case ticket.signal <- struct{}{}:
select {
case <-l.ctx.Done():
ticket.cancel()
continue
case <-ticket.ctx.Done():
continue
case <-ticket.signal:
ticket.cancel()
}
}
case <-l.ctx.Done():
continue
}
}
}
// Close shuts down the Line and releases all waiting goroutines.
// After calling Close, no new tickets can be acquired.
func (l *Line) Close() {
l.cancel()
<-l.done
}
// Lock places the calling goroutine in the queue.
// It returns a LockHandler which must be unlocked after use.
// If the Line is closed or context is cancelled before acquisition, it returns an error.
func (l *Line) Lock(ctx context.Context) (LockHandler, error) {
c, cancel := context.WithCancel(ctx)
ticket := &pass{
signal: make(chan struct{}, 1),
ctx: c,
cancel: cancel,
queuectx: l.ctx,
}
select {
case <-l.ctx.Done():
cancel()
return nil, ErrQueueClosed
case <-ticket.ctx.Done():
cancel()
return nil, ErrContextCancelled
case l.queue <- ticket:
}
select {
case <-ticket.signal:
return ticket, nil
case <-l.ctx.Done():
cancel()
return nil, ErrQueueClosed
case <-ctx.Done():
cancel()
return nil, ErrContextCancelled
}
}
// Unlock releases the ticket and allows the next goroutine in line to proceed.
// It must be called exactly once after a successful Lock acquisition.
func (p *pass) Unlock() error {
select {
case <-p.queuectx.Done():
return ErrQueueClosed
default:
}
select {
case p.signal <- struct{}{}:
return nil
default:
return ErrFailedUnlock
}
}