Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ All notable changes to the SQLiteQ project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.2.1] - 2025-05-07

### Fixed

- Added recovery functionality for unacknowledged queue items via `RequeueNoAckRows()`
- Added boolean column `ack` to track acknowledgment status
- Improved query performance with additional indexes
- Fixed issue with duplicate ack_id generation

## [0.2.0] - 2025-05-06

### Changed
Expand Down
65 changes: 52 additions & 13 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func newQueue(db *sql.DB, tableName string, opts ...Option) (*Queue, error) {
return nil, fmt.Errorf("failed to initialize table: %w", err)
}

q.RequeueNoAckRows()

return q, nil
}

Expand All @@ -47,16 +49,36 @@ func (q *Queue) initTable() error {
data BLOB NOT NULL,
status TEXT NOT NULL,
ack_id TEXT UNIQUE,
ack BOOLEAN DEFAULT 0,
created_at TIMESTAMP,
updated_at TIMESTAMP
);
CREATE INDEX IF NOT EXISTS %s_status_idx ON %s (status, created_at);
`, q.tableName, q.tableName, q.tableName)
CREATE INDEX IF NOT EXISTS %s_status_ack_idx ON %s (status, ack);
CREATE INDEX IF NOT EXISTS %s_ack_id_idx ON %s (ack_id);
`, q.tableName, q.tableName, q.tableName, q.tableName, q.tableName, q.tableName, q.tableName)

_, err := q.client.Exec(createTableSQL)
return err
}

func (q *Queue) RequeueNoAckRows() {
tx, err := q.client.Begin()

defer func() {
if err != nil {
tx.Rollback()
}
}()

_, err = tx.Exec(
fmt.Sprintf("UPDATE %s SET status = 'pending', updated_at = ? WHERE status = 'processing' AND ack = 0", q.tableName),
time.Now().UTC(),
)

tx.Commit()
}

// Enqueue adds an item to the queue
// It serializes the item to JSON and stores it in the database
// Returns true if the operation was successful
Expand All @@ -77,8 +99,8 @@ func (q *Queue) Enqueue(item any) bool {
}()

_, err = tx.Exec(
fmt.Sprintf("INSERT INTO %s (data, status, created_at, updated_at) VALUES (?, ?, ?, ?)", q.tableName),
item, "pending", now, now,
fmt.Sprintf("INSERT INTO %s (data, status, ack, created_at, updated_at) VALUES (?, ?, ?, ?, ?)", q.tableName),
item, "pending", 0, now, now,
)
if err != nil {
return false
Expand Down Expand Up @@ -109,11 +131,24 @@ func (q *Queue) dequeueInternal(withAckId bool) (item any, success bool, ackID s
// Get the oldest pending item
var id int64
var data []byte

// Only dequeue pending items in FIFO order
row := tx.QueryRow(fmt.Sprintf(
"SELECT id, data FROM %s WHERE status = 'pending' ORDER BY created_at ASC LIMIT 1",
"SELECT id, data, ack_id FROM %s WHERE status = 'pending' ORDER BY created_at ASC LIMIT 1",
q.tableName,
))
err = row.Scan(&id, &data)

// Use NullString to handle NULL values from database
var nullAckID sql.NullString

// Scan the row data
err = row.Scan(&id, &data, &nullAckID) // ackID may be NULL for pending items

// Extract the string value if valid
if nullAckID.Valid {
ackID = nullAckID.String
}

if err != nil {
if err == sql.ErrNoRows {
tx.Rollback()
Expand All @@ -123,17 +158,21 @@ func (q *Queue) dequeueInternal(withAckId bool) (item any, success bool, ackID s
return nil, false, ""
}

// Update the status to 'processing', with or without ack ID
// Update the status to 'processing' or delete the item, based on withAckId
now := time.Now().UTC()

if withAckId {
ackID = cuid.New()
if ackID == "" {
ackID = cuid.New()
}

// Update the item to processing status
_, err = tx.Exec(
fmt.Sprintf("UPDATE %s SET status = 'processing', ack_id = ?, updated_at = ? WHERE id = ?", q.tableName),
ackID, now, id,
)
} else {
// remove the row if there is no ack
// For regular Dequeue, just delete the item immediately
_, err = tx.Exec(
fmt.Sprintf("DELETE FROM %s WHERE id = ?", q.tableName),
id,
Expand Down Expand Up @@ -185,15 +224,14 @@ func (q *Queue) Acknowledge(ackID string) bool {
if q.removeOnComplete {
// If removeOnComplete is true, delete the acknowledged item
result, err = tx.Exec(
fmt.Sprintf("DELETE FROM %s WHERE ack_id = ? AND status = 'processing'", q.tableName),
fmt.Sprintf("DELETE FROM %s WHERE ack_id = ? ", q.tableName),
ackID,
)
} else {
// Otherwise, just mark it as completed
now := time.Now().UTC()
// Otherwise, mark it as completed and set ack to 1 (true in SQLite)
result, err = tx.Exec(
fmt.Sprintf("UPDATE %s SET status = 'completed', updated_at = ? WHERE ack_id = ? AND status = 'processing'", q.tableName),
now, ackID,
fmt.Sprintf("UPDATE %s SET status = 'completed', ack = 1, updated_at = ? WHERE ack_id = ?", q.tableName),
time.Now().UTC(), ackID,
)
}

Expand All @@ -203,6 +241,7 @@ func (q *Queue) Acknowledge(ackID string) bool {
}

rowsAffected, err := result.RowsAffected()

if err != nil || rowsAffected == 0 {
tx.Rollback()
return false
Expand Down