diff --git a/CHANGELOG.md b/CHANGELOG.md index 833a532..e7a3ac7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,15 @@ All notable changes to the SQLiteQ project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.2.1] - 2025-05-07 + +### Fixed + +- Added recovery functionality for unacknowledged queue items via `RequeueNoAckRows()` +- Added boolean column `ack` to track acknowledgment status +- Improved query performance with additional indexes +- Fixed issue with duplicate ack_id generation + ## [0.2.0] - 2025-05-06 ### Changed diff --git a/queue.go b/queue.go index b45fa89..4eb7d39 100644 --- a/queue.go +++ b/queue.go @@ -36,6 +36,8 @@ func newQueue(db *sql.DB, tableName string, opts ...Option) (*Queue, error) { return nil, fmt.Errorf("failed to initialize table: %w", err) } + q.RequeueNoAckRows() + return q, nil } @@ -47,16 +49,36 @@ func (q *Queue) initTable() error { data BLOB NOT NULL, status TEXT NOT NULL, ack_id TEXT UNIQUE, + ack BOOLEAN DEFAULT 0, created_at TIMESTAMP, updated_at TIMESTAMP ); CREATE INDEX IF NOT EXISTS %s_status_idx ON %s (status, created_at); - `, q.tableName, q.tableName, q.tableName) + CREATE INDEX IF NOT EXISTS %s_status_ack_idx ON %s (status, ack); + CREATE INDEX IF NOT EXISTS %s_ack_id_idx ON %s (ack_id); + `, q.tableName, q.tableName, q.tableName, q.tableName, q.tableName, q.tableName, q.tableName) _, err := q.client.Exec(createTableSQL) return err } +func (q *Queue) RequeueNoAckRows() { + tx, err := q.client.Begin() + + defer func() { + if err != nil { + tx.Rollback() + } + }() + + _, err = tx.Exec( + fmt.Sprintf("UPDATE %s SET status = 'pending', updated_at = ? WHERE status = 'processing' AND ack = 0", q.tableName), + time.Now().UTC(), + ) + + tx.Commit() +} + // Enqueue adds an item to the queue // It serializes the item to JSON and stores it in the database // Returns true if the operation was successful @@ -77,8 +99,8 @@ func (q *Queue) Enqueue(item any) bool { }() _, err = tx.Exec( - fmt.Sprintf("INSERT INTO %s (data, status, created_at, updated_at) VALUES (?, ?, ?, ?)", q.tableName), - item, "pending", now, now, + fmt.Sprintf("INSERT INTO %s (data, status, ack, created_at, updated_at) VALUES (?, ?, ?, ?, ?)", q.tableName), + item, "pending", 0, now, now, ) if err != nil { return false @@ -109,11 +131,24 @@ func (q *Queue) dequeueInternal(withAckId bool) (item any, success bool, ackID s // Get the oldest pending item var id int64 var data []byte + + // Only dequeue pending items in FIFO order row := tx.QueryRow(fmt.Sprintf( - "SELECT id, data FROM %s WHERE status = 'pending' ORDER BY created_at ASC LIMIT 1", + "SELECT id, data, ack_id FROM %s WHERE status = 'pending' ORDER BY created_at ASC LIMIT 1", q.tableName, )) - err = row.Scan(&id, &data) + + // Use NullString to handle NULL values from database + var nullAckID sql.NullString + + // Scan the row data + err = row.Scan(&id, &data, &nullAckID) // ackID may be NULL for pending items + + // Extract the string value if valid + if nullAckID.Valid { + ackID = nullAckID.String + } + if err != nil { if err == sql.ErrNoRows { tx.Rollback() @@ -123,17 +158,21 @@ func (q *Queue) dequeueInternal(withAckId bool) (item any, success bool, ackID s return nil, false, "" } - // Update the status to 'processing', with or without ack ID + // Update the status to 'processing' or delete the item, based on withAckId now := time.Now().UTC() + if withAckId { - ackID = cuid.New() + if ackID == "" { + ackID = cuid.New() + } + // Update the item to processing status _, err = tx.Exec( fmt.Sprintf("UPDATE %s SET status = 'processing', ack_id = ?, updated_at = ? WHERE id = ?", q.tableName), ackID, now, id, ) } else { - // remove the row if there is no ack + // For regular Dequeue, just delete the item immediately _, err = tx.Exec( fmt.Sprintf("DELETE FROM %s WHERE id = ?", q.tableName), id, @@ -185,15 +224,14 @@ func (q *Queue) Acknowledge(ackID string) bool { if q.removeOnComplete { // If removeOnComplete is true, delete the acknowledged item result, err = tx.Exec( - fmt.Sprintf("DELETE FROM %s WHERE ack_id = ? AND status = 'processing'", q.tableName), + fmt.Sprintf("DELETE FROM %s WHERE ack_id = ? ", q.tableName), ackID, ) } else { - // Otherwise, just mark it as completed - now := time.Now().UTC() + // Otherwise, mark it as completed and set ack to 1 (true in SQLite) result, err = tx.Exec( - fmt.Sprintf("UPDATE %s SET status = 'completed', updated_at = ? WHERE ack_id = ? AND status = 'processing'", q.tableName), - now, ackID, + fmt.Sprintf("UPDATE %s SET status = 'completed', ack = 1, updated_at = ? WHERE ack_id = ?", q.tableName), + time.Now().UTC(), ackID, ) } @@ -203,6 +241,7 @@ func (q *Queue) Acknowledge(ackID string) bool { } rowsAffected, err := result.RowsAffected() + if err != nil || rowsAffected == 0 { tx.Rollback() return false