Skip to content

Commit d377b79

Browse files
tomerwellerclaude
andcommitted
Add order parameter to getEvents for descending order support
This adds an optional `order` field to the getEvents pagination options, allowing clients to retrieve events in descending order (newest first). Changes: - Update DB layer to accept order parameter for query direction - Update handler to use protocol.EventOrder types - Add tests for descending order, limit with descending, and invalid order This feature enables efficient querying of the N most recent events without scanning the entire retention window. The implementation is fully backwards compatible - order defaults to "asc" when not specified. Depends on: stellar/go-stellar-sdk#5888 Closes #575 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 45a1362 commit d377b79

7 files changed

Lines changed: 286 additions & 30 deletions

File tree

cmd/stellar-rpc/internal/db/event.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@ type EventWriter interface {
3030
InsertEvents(lcm xdr.LedgerCloseMeta) error
3131
}
3232

33+
// EventOrder represents the order in which events are returned
34+
type EventOrder string
35+
36+
const (
37+
// EventOrderAsc returns events in ascending order (oldest first)
38+
EventOrderAsc EventOrder = "asc"
39+
// EventOrderDesc returns events in descending order (newest first)
40+
EventOrderDesc EventOrder = "desc"
41+
)
42+
3343
// EventReader has all the public methods to fetch events from DB
3444
type EventReader interface {
3545
GetEvents(
@@ -38,6 +48,7 @@ type EventReader interface {
3848
contractIDs [][]byte,
3949
topics NestedTopicArray,
4050
eventTypes []int,
51+
order EventOrder,
4152
f ScanFunction,
4253
) error
4354
}
@@ -292,7 +303,7 @@ func (eventHandler *eventHandler) trimEvents(latestLedgerSeq uint32, retentionWi
292303

293304
// GetEvents applies f on all the events occurring in the given range with
294305
// specified contract IDs if provided. The events are returned in sorted
295-
// ascending Cursor order.
306+
// order based on the order parameter (ascending or descending).
296307
//
297308
// If f returns false, the scan terminates early (f will not be applied on
298309
// remaining events in the range).
@@ -304,16 +315,23 @@ func (eventHandler *eventHandler) GetEvents(
304315
contractIDs [][]byte,
305316
topics NestedTopicArray,
306317
eventTypes []int,
318+
order EventOrder,
307319
scanner ScanFunction,
308320
) error {
309321
start := time.Now()
310322

323+
// Determine sort order
324+
orderDirection := "ASC"
325+
if order == EventOrderDesc {
326+
orderDirection = "DESC"
327+
}
328+
311329
rowQ := sq.
312330
Select("id", "event_data", "transaction_hash", "ledger_close_time").
313331
From(eventTableName).
314332
Where(sq.GtOrEq{"id": cursorRange.Start.String()}).
315333
Where(sq.Lt{"id": cursorRange.End.String()}).
316-
OrderBy("id ASC")
334+
OrderBy("id " + orderDirection)
317335

318336
if len(contractIDs) > 0 {
319337
rowQ = rowQ.Where(sq.Eq{"contract_id": contractIDs})

cmd/stellar-rpc/internal/db/event_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,6 @@ func TestInsertEvents(t *testing.T) {
201201
end := protocol.Cursor{Ledger: 100}
202202
cursorRange := protocol.CursorRange{Start: start, End: end}
203203

204-
err = eventReader.GetEvents(ctx, cursorRange, nil, nil, nil, nil)
204+
err = eventReader.GetEvents(ctx, cursorRange, nil, nil, nil, EventOrderAsc, nil)
205205
require.NoError(t, err)
206206
}

cmd/stellar-rpc/internal/db/transaction_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ func TestTransactionFound(t *testing.T) {
229229
end := protocol.Cursor{Ledger: 1000}
230230
cursorRange := protocol.CursorRange{Start: start, End: end}
231231

232-
err = eventReader.GetEvents(ctx, cursorRange, nil, nil, nil, nil)
232+
err = eventReader.GetEvents(ctx, cursorRange, nil, nil, nil, EventOrderAsc, nil)
233233
require.NoError(t, err)
234234

235235
// check all 200 cases

cmd/stellar-rpc/internal/methods/get_events.go

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,30 @@ func (h eventsRPCHandler) getEvents(ctx context.Context, request protocol.GetEve
117117
}
118118
}
119119

120+
order := protocol.EventOrderAsc
121+
if request.Pagination != nil && request.Pagination.Order != "" {
122+
order = request.Pagination.Order
123+
}
124+
isDescending := order == protocol.EventOrderDesc
125+
120126
start := protocol.Cursor{Ledger: request.StartLedger}
121127
limit := h.defaultLimit
122128
if request.Pagination != nil {
123129
if request.Pagination.Cursor != nil {
124130
start = *request.Pagination.Cursor
125-
// increment event index because, when paginating, we start with the
126-
// item right after the cursor
127-
start.Event++
131+
// Adjust cursor for pagination based on order direction
132+
if isDescending {
133+
// For descending order, we move backwards from the cursor
134+
if start.Event > 0 {
135+
start.Event--
136+
} else {
137+
// Need to move to previous tx/op/ledger
138+
start = decrementCursor(start)
139+
}
140+
} else {
141+
// For ascending order, we move forward from the cursor
142+
start.Event++
143+
}
128144
}
129145
if request.Pagination.Limit > 0 {
130146
limit = request.Pagination.Limit
@@ -179,7 +195,13 @@ func (h eventsRPCHandler) getEvents(ctx context.Context, request protocol.GetEve
179195
return uint(len(found)) < limit
180196
}
181197

182-
err = h.dbReader.GetEvents(ctx, cursorRange, contractIDs, topics, eventTypes, eventScanFunction)
198+
// Convert order to db.EventOrder
199+
dbOrder := db.EventOrderAsc
200+
if isDescending {
201+
dbOrder = db.EventOrderDesc
202+
}
203+
204+
err = h.dbReader.GetEvents(ctx, cursorRange, contractIDs, topics, eventTypes, dbOrder, eventScanFunction)
183205
if err != nil {
184206
return protocol.GetEventsResponse{}, &jrpc2.Error{
185207
Code: jrpc2.InvalidRequest, Message: err.Error(),
@@ -209,9 +231,15 @@ func (h eventsRPCHandler) getEvents(ctx context.Context, request protocol.GetEve
209231
// cursor represents end of the search window if events does not reach limit
210232
// here endLedger is always exclusive when fetching events
211233
// so search window is max Cursor value with endLedger - 1
212-
maxCursor := protocol.MaxCursor
213-
maxCursor.Ledger = endLedger - 1
214-
cursor = maxCursor.String()
234+
if isDescending {
235+
// For descending order, the cursor represents the start of the search window
236+
minCursor := protocol.Cursor{Ledger: start.Ledger}
237+
cursor = minCursor.String()
238+
} else {
239+
maxCursor := protocol.MaxCursor
240+
maxCursor.Ledger = endLedger - 1
241+
cursor = maxCursor.String()
242+
}
215243
}
216244

217245
return protocol.GetEventsResponse{
@@ -225,6 +253,23 @@ func (h eventsRPCHandler) getEvents(ctx context.Context, request protocol.GetEve
225253
}, nil
226254
}
227255

256+
// decrementCursor decrements the cursor to the previous position
257+
func decrementCursor(c protocol.Cursor) protocol.Cursor {
258+
// If we're at the minimum cursor for this ledger, we can't go further back
259+
// The cursor will remain at position 0,0,0 for the ledger
260+
if c.Event == 0 && c.Op == 0 && c.Tx == 0 {
261+
return c
262+
}
263+
// Set to the maximum possible cursor value to capture all earlier events
264+
// This effectively means "everything before this cursor in this ledger"
265+
return protocol.Cursor{
266+
Ledger: c.Ledger,
267+
Tx: c.Tx,
268+
Op: c.Op,
269+
Event: 0, // The DB query will handle the rest with DESC ordering
270+
}
271+
}
272+
228273
func eventInfoForEvent(
229274
event xdr.DiagnosticEvent,
230275
cursor protocol.Cursor,

cmd/stellar-rpc/internal/methods/get_events_test.go

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,6 +1012,199 @@ func TestGetEvents(t *testing.T) {
10121012
results,
10131013
)
10141014
})
1015+
1016+
t.Run("with descending order", func(t *testing.T) {
1017+
dbx := newTestDB(t)
1018+
ctx := context.TODO()
1019+
log := log.DefaultLogger
1020+
log.SetLevel(logrus.TraceLevel)
1021+
1022+
writer := db.NewReadWriter(log, dbx, interfaces.MakeNoOpDeamon(), 10, 10, passphrase)
1023+
write, err := writer.NewTx(ctx)
1024+
require.NoError(t, err)
1025+
1026+
ledgerW, eventW := write.LedgerWriter(), write.EventWriter()
1027+
store := db.NewEventReader(log, dbx, passphrase)
1028+
1029+
contractID := xdr.ContractId([32]byte{})
1030+
var txMeta []xdr.TransactionMeta
1031+
for i := range 5 {
1032+
number := xdr.Uint64(i)
1033+
txMeta = append(txMeta, transactionMetaWithEvents(
1034+
contractEvent(
1035+
contractID,
1036+
xdr.ScVec{
1037+
xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counter},
1038+
},
1039+
xdr.ScVal{Type: xdr.ScValTypeScvU64, U64: &number},
1040+
),
1041+
))
1042+
}
1043+
ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)
1044+
require.NoError(t, ledgerW.InsertLedger(ledgerCloseMeta), "ingestion failed for ledger")
1045+
require.NoError(t, eventW.InsertEvents(ledgerCloseMeta), "ingestion failed for events")
1046+
require.NoError(t, write.Commit(ledgerCloseMeta, nil))
1047+
1048+
handler := eventsRPCHandler{
1049+
dbReader: store,
1050+
maxLimit: 10000,
1051+
defaultLimit: 100,
1052+
ledgerReader: db.NewLedgerReader(dbx),
1053+
}
1054+
1055+
// Test descending order returns events in reverse order
1056+
results, err := handler.getEvents(ctx, protocol.GetEventsRequest{
1057+
StartLedger: 1,
1058+
Pagination: &protocol.PaginationOptions{
1059+
Order: protocol.EventOrderDesc,
1060+
},
1061+
})
1062+
require.NoError(t, err)
1063+
require.Len(t, results.Events, 5)
1064+
1065+
// Verify events are returned in descending order (tx 5, 4, 3, 2, 1)
1066+
for i, event := range results.Events {
1067+
expectedTxIndex := uint32(5 - i)
1068+
assert.Equal(t, expectedTxIndex, event.TxIndex,
1069+
"event %d should have TxIndex %d", i, expectedTxIndex)
1070+
}
1071+
})
1072+
1073+
t.Run("descending order with limit", func(t *testing.T) {
1074+
dbx := newTestDB(t)
1075+
ctx := context.TODO()
1076+
log := log.DefaultLogger
1077+
log.SetLevel(logrus.TraceLevel)
1078+
1079+
writer := db.NewReadWriter(log, dbx, interfaces.MakeNoOpDeamon(), 10, 10, passphrase)
1080+
write, err := writer.NewTx(ctx)
1081+
require.NoError(t, err)
1082+
1083+
ledgerW, eventW := write.LedgerWriter(), write.EventWriter()
1084+
store := db.NewEventReader(log, dbx, passphrase)
1085+
1086+
contractID := xdr.ContractId([32]byte{})
1087+
var txMeta []xdr.TransactionMeta
1088+
for i := range 10 {
1089+
number := xdr.Uint64(i)
1090+
txMeta = append(txMeta, transactionMetaWithEvents(
1091+
contractEvent(
1092+
contractID,
1093+
xdr.ScVec{
1094+
xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counter},
1095+
},
1096+
xdr.ScVal{Type: xdr.ScValTypeScvU64, U64: &number},
1097+
),
1098+
))
1099+
}
1100+
ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)
1101+
require.NoError(t, ledgerW.InsertLedger(ledgerCloseMeta), "ingestion failed for ledger")
1102+
require.NoError(t, eventW.InsertEvents(ledgerCloseMeta), "ingestion failed for events")
1103+
require.NoError(t, write.Commit(ledgerCloseMeta, nil))
1104+
1105+
handler := eventsRPCHandler{
1106+
dbReader: store,
1107+
maxLimit: 10000,
1108+
defaultLimit: 100,
1109+
ledgerReader: db.NewLedgerReader(dbx),
1110+
}
1111+
1112+
// Test descending order with limit returns the N newest events
1113+
results, err := handler.getEvents(ctx, protocol.GetEventsRequest{
1114+
StartLedger: 1,
1115+
Pagination: &protocol.PaginationOptions{
1116+
Limit: 3,
1117+
Order: protocol.EventOrderDesc,
1118+
},
1119+
})
1120+
require.NoError(t, err)
1121+
require.Len(t, results.Events, 3)
1122+
1123+
// Should return the 3 newest events (tx 10, 9, 8 in that order)
1124+
assert.Equal(t, uint32(10), results.Events[0].TxIndex)
1125+
assert.Equal(t, uint32(9), results.Events[1].TxIndex)
1126+
assert.Equal(t, uint32(8), results.Events[2].TxIndex)
1127+
})
1128+
1129+
t.Run("invalid order parameter", func(t *testing.T) {
1130+
dbx := newTestDB(t)
1131+
log := log.DefaultLogger
1132+
log.SetLevel(logrus.TraceLevel)
1133+
store := db.NewEventReader(log, dbx, passphrase)
1134+
1135+
handler := eventsRPCHandler{
1136+
dbReader: store,
1137+
maxLimit: 10000,
1138+
defaultLimit: 100,
1139+
ledgerReader: db.NewLedgerReader(dbx),
1140+
}
1141+
1142+
_, err := handler.getEvents(context.TODO(), protocol.GetEventsRequest{
1143+
StartLedger: 1,
1144+
Pagination: &protocol.PaginationOptions{
1145+
Order: "invalid",
1146+
},
1147+
})
1148+
require.Error(t, err)
1149+
require.Contains(t, err.Error(), "order must be 'asc' or 'desc'")
1150+
})
1151+
1152+
t.Run("ascending order explicitly set", func(t *testing.T) {
1153+
dbx := newTestDB(t)
1154+
ctx := context.TODO()
1155+
log := log.DefaultLogger
1156+
log.SetLevel(logrus.TraceLevel)
1157+
1158+
writer := db.NewReadWriter(log, dbx, interfaces.MakeNoOpDeamon(), 10, 10, passphrase)
1159+
write, err := writer.NewTx(ctx)
1160+
require.NoError(t, err)
1161+
1162+
ledgerW, eventW := write.LedgerWriter(), write.EventWriter()
1163+
store := db.NewEventReader(log, dbx, passphrase)
1164+
1165+
contractID := xdr.ContractId([32]byte{})
1166+
var txMeta []xdr.TransactionMeta
1167+
for i := range 5 {
1168+
number := xdr.Uint64(i)
1169+
txMeta = append(txMeta, transactionMetaWithEvents(
1170+
contractEvent(
1171+
contractID,
1172+
xdr.ScVec{
1173+
xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counter},
1174+
},
1175+
xdr.ScVal{Type: xdr.ScValTypeScvU64, U64: &number},
1176+
),
1177+
))
1178+
}
1179+
ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)
1180+
require.NoError(t, ledgerW.InsertLedger(ledgerCloseMeta), "ingestion failed for ledger")
1181+
require.NoError(t, eventW.InsertEvents(ledgerCloseMeta), "ingestion failed for events")
1182+
require.NoError(t, write.Commit(ledgerCloseMeta, nil))
1183+
1184+
handler := eventsRPCHandler{
1185+
dbReader: store,
1186+
maxLimit: 10000,
1187+
defaultLimit: 100,
1188+
ledgerReader: db.NewLedgerReader(dbx),
1189+
}
1190+
1191+
// Test explicitly set ascending order
1192+
results, err := handler.getEvents(ctx, protocol.GetEventsRequest{
1193+
StartLedger: 1,
1194+
Pagination: &protocol.PaginationOptions{
1195+
Order: protocol.EventOrderAsc,
1196+
},
1197+
})
1198+
require.NoError(t, err)
1199+
require.Len(t, results.Events, 5)
1200+
1201+
// Verify events are returned in ascending order (tx 1, 2, 3, 4, 5)
1202+
for i, event := range results.Events {
1203+
expectedTxIndex := uint32(i + 1)
1204+
assert.Equal(t, expectedTxIndex, event.TxIndex,
1205+
"event %d should have TxIndex %d", i, expectedTxIndex)
1206+
}
1207+
})
10151208
}
10161209

10171210
func BenchmarkGetEvents(b *testing.B) {

go.mod

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,14 +129,14 @@ require (
129129
go.opentelemetry.io/otel/metric v1.38.0 // indirect
130130
go.opentelemetry.io/otel/trace v1.38.0 // indirect
131131
go.uber.org/multierr v1.11.0 // indirect
132-
golang.org/x/crypto v0.43.0 // indirect
132+
golang.org/x/crypto v0.45.0 // indirect
133133
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect
134134
golang.org/x/mod v0.29.0
135-
golang.org/x/net v0.46.0 // indirect
135+
golang.org/x/net v0.47.0 // indirect
136136
golang.org/x/oauth2 v0.32.0 // indirect
137-
golang.org/x/sync v0.17.0 // indirect
138-
golang.org/x/sys v0.37.0 // indirect
139-
golang.org/x/text v0.30.0 // indirect
137+
golang.org/x/sync v0.18.0 // indirect
138+
golang.org/x/sys v0.38.0 // indirect
139+
golang.org/x/text v0.31.0 // indirect
140140
golang.org/x/time v0.14.0 // indirect
141141
google.golang.org/api v0.254.0 // indirect
142142
google.golang.org/genproto v0.0.0-20251029180050-ab9386a59fda // indirect
@@ -150,3 +150,5 @@ require (
150150
gopkg.in/tylerb/graceful.v1 v1.2.15 // indirect
151151
gopkg.in/yaml.v3 v3.0.1 // indirect
152152
)
153+
154+
replace github.com/stellar/go-stellar-sdk => ../go-stellar-sdk

0 commit comments

Comments
 (0)