Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ func newPriorityQueue(db *sql.DB, tableName string, opts ...Option) (*PriorityQu
func (pq *PriorityQueue) initPriorityColumn() error {
// Check if priority column exists
var name string
err := pq.client.QueryRow(fmt.Sprintf("PRAGMA table_info(%s)", pq.tableName)).Scan(nil, &name, nil, nil, nil, nil)
err := pq.client.QueryRow(fmt.Sprintf("PRAGMA table_info(%s)", quoteIdent(pq.tableName))).Scan(nil, &name, nil, nil, nil, nil)

if err != nil || name != "priority" {
// Add priority column with default value 0
_, err := pq.client.Exec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN priority INTEGER NOT NULL DEFAULT 0", pq.tableName))
_, err := pq.client.Exec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN priority INTEGER NOT NULL DEFAULT 0", quoteIdent(pq.tableName)))
if err != nil {
return err
}

// Create index on priority (ASC for lower numbers = higher priority)
_, err = pq.client.Exec(fmt.Sprintf("CREATE INDEX IF NOT EXISTS %s_priority_idx ON %s (priority ASC, created_at ASC)", pq.tableName, pq.tableName))
_, err = pq.client.Exec(fmt.Sprintf("CREATE INDEX IF NOT EXISTS %s ON %s (priority ASC, created_at ASC)", quoteIdent(pq.tableName+"_priority_idx"), quoteIdent(pq.tableName)))
if err != nil {
return err
}
Expand Down Expand Up @@ -75,7 +75,7 @@ func (pq *PriorityQueue) Enqueue(item any, priority int) bool {
}()

_, err = tx.Exec(
fmt.Sprintf("INSERT INTO %s (data, status, created_at, updated_at, priority) VALUES (?, ?, ?, ?, ?)", pq.tableName),
fmt.Sprintf("INSERT INTO %s (data, status, created_at, updated_at, priority) VALUES (?, ?, ?, ?, ?)", quoteIdent(pq.tableName)),
item, "pending", now, now, priority,
)
if err != nil {
Expand Down Expand Up @@ -108,7 +108,7 @@ func (pq *PriorityQueue) dequeueInternal(withAckId bool) (any, bool, string) {
var data []byte
row := tx.QueryRow(fmt.Sprintf(
"SELECT id, data FROM %s WHERE status = 'pending' ORDER BY priority ASC, created_at ASC LIMIT 1",
pq.tableName,
quoteIdent(pq.tableName),
))
err = row.Scan(&id, &data)
if err != nil {
Expand All @@ -128,13 +128,13 @@ func (pq *PriorityQueue) dequeueInternal(withAckId bool) (any, bool, string) {
ackID = cuid.New()

_, err = tx.Exec(
fmt.Sprintf("UPDATE %s SET status = 'processing', ack_id = ?, updated_at = ? WHERE id = ?", pq.tableName),
fmt.Sprintf("UPDATE %s SET status = 'processing', ack_id = ?, updated_at = ? WHERE id = ?", quoteIdent(pq.tableName)),
ackID, now, id,
)
} else {
// remove the row if there is no ack
_, err = tx.Exec(
fmt.Sprintf("DELETE FROM %s WHERE id = ?", pq.tableName),
fmt.Sprintf("DELETE FROM %s WHERE id = ?", quoteIdent(pq.tableName)),
id,
)
}
Expand Down
9 changes: 0 additions & 9 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package sqliteq
import (
"database/sql"
"fmt"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -317,11 +316,3 @@ func (q *Queue) Close() error {

return nil
}

// Applies quotes to an identifier escaping any internal quotes.
// See: https://www.sqlite.org/lang_keywords.html
func quoteIdent(name string) string {
// Replace quotes with dobule quotes
escaped := strings.ReplaceAll(name, `"`, `""`)
return `"` + escaped + `"`
}
11 changes: 11 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package sqliteq

import "strings"

// Applies quotes to an identifier escaping any internal quotes.
// See: https://www.sqlite.org/lang_keywords.html
func quoteIdent(name string) string {
// Replace quotes with dobule quotes
escaped := strings.ReplaceAll(name, `"`, `""`)
return `"` + escaped + `"`
}