Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:

strategy:
matrix:
node-version: [18.x, 20.x, 22.x, 24.x]
node-version: [20.x, 22.x, 24.x]

steps:
- name: Checkout Repository
Expand Down
8 changes: 4 additions & 4 deletions lib/AbstractCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ export abstract class AbstractCache<
this.notificationConsumer.setTargetCache(this.inMemoryCache)
this.initPromises.push(
this.notificationConsumer.subscribe().catch((err) => {
/* c8 ignore next 1 */
/* v8 ignore next -- @preserve */
this.notificationConsumer!.errorHandler(err, this.notificationConsumer!.serverUuid, this.logger)
}),
)
Expand All @@ -148,7 +148,7 @@ export abstract class AbstractCache<
this.notificationPublisher = config.notificationPublisher
this.initPromises.push(
this.notificationPublisher.subscribe().catch((err) => {
/* c8 ignore next 1 */
/* v8 ignore next -- @preserve */
this.notificationPublisher!.errorHandler(err, this.notificationPublisher!.channel, this.logger)
}),
)
Expand Down Expand Up @@ -184,7 +184,7 @@ export abstract class AbstractCache<
await this.asyncCache.close()
}

/* c8 ignore start */
/* v8 ignore next -- @preserve */
if (this.notificationConsumer) {
try {
await this.notificationConsumer.close()
Expand All @@ -193,6 +193,7 @@ export abstract class AbstractCache<
this.logger.error(`Failed to close notification consumer: ${err.message}`)
}
}
/* v8 ignore next -- @preserve */
if (this.notificationPublisher) {
try {
await this.notificationPublisher.close()
Expand All @@ -201,6 +202,5 @@ export abstract class AbstractCache<
this.logger.error(`Failed to close notification publisher: ${err.message}`)
}
}
/* c8 ignore stop */
}
}
4 changes: 2 additions & 2 deletions lib/AbstractFlatCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ export abstract class AbstractFlatCache<LoadedValue, LoadParams = string, LoadMa
public async invalidateCacheForMany(keys: string[]) {
if (this.asyncCache) {
await this.asyncCache.deleteMany(keys).catch((err) => {
/* c8 ignore next 1 */
/* v8 ignore next -- @preserve */
this.cacheUpdateErrorHandler(err, undefined, this.asyncCache!, this.logger)
})
}
Expand All @@ -161,7 +161,7 @@ export abstract class AbstractFlatCache<LoadedValue, LoadParams = string, LoadMa

if (this.notificationPublisher) {
this.notificationPublisher.deleteMany(keys).catch((err) => {
/* c8 ignore next 1 */
/* v8 ignore next -- @preserve */
this.notificationPublisher!.errorHandler(err, this.notificationPublisher!.channel, this.logger)
})
}
Expand Down
10 changes: 5 additions & 5 deletions lib/Loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,19 @@ export class Loader<LoadedValue, LoadParams = string, LoadManyParams = LoadParam

public async forceSetValue(key: string, newValue: LoadedValue | null) {
this.inMemoryCache.set(key, newValue)
/* v8 ignore next 3 */
/* v8 ignore next -- @preserve */
if (this.runningLoads.has(key)) {
this.runningLoads.delete(key)
}

if (this.asyncCache) {
await this.asyncCache.set(key, newValue).catch((err) => {
/* v8 ignore next 1 */
/* v8 ignore next -- @preserve */
this.cacheUpdateErrorHandler(err, key, this.asyncCache!, this.logger)
})
}

/* v8 ignore next 5 */
/* v8 ignore next -- @preserve */
if (this.notificationPublisher) {
this.notificationPublisher.set(key, newValue).catch((err) => {
this.notificationPublisher!.errorHandler(err, this.notificationPublisher!.channel, this.logger)
Expand All @@ -97,14 +97,14 @@ export class Loader<LoadedValue, LoadParams = string, LoadManyParams = LoadParam
if (finalValue !== undefined) {
this.inMemoryCache.set(key, finalValue)

/* v8 ignore next 3 */
/* v8 ignore next -- @preserve */
if (this.runningLoads.has(key)) {
this.runningLoads.delete(key)
}
}

// In order to keep other cluster nodes in-sync with potentially changed entry, we force them to refresh too
/* v8 ignore next 5 */
/* v8 ignore next -- @preserve */
if (this.notificationPublisher) {
this.notificationPublisher.delete(key).catch((err) => {
this.notificationPublisher!.errorHandler(err, this.notificationPublisher!.channel, this.logger)
Expand Down
2 changes: 1 addition & 1 deletion lib/notifications/AbstractNotificationConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { Logger } from '../util/Logger'

export type ConsumerErrorHandler = (err: Error, channel: string, logger: Logger) => void

/* c8 ignore next 3 */
/* v8 ignore next -- @preserve */
export const DEFAULT_NOTIFICATION_ERROR_HANDLER: ConsumerErrorHandler = (err, serverUuid, logger) => {
logger.error(`Notification consumer error for server UUID ${serverUuid}: ${err.message}`)
}
Expand Down
2 changes: 1 addition & 1 deletion lib/redis/RedisExpirationTimeDataSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export class RedisExpirationTimeDataSource implements DataSource<number, string>
return this.parentAsyncCache.getExpirationTime(key)
}

/* c8 ignore next 3 */
/* v8 ignore next -- @preserve */
getMany(): Promise<number[]> {
throw new Error('Not supported')
}
Expand Down
2 changes: 1 addition & 1 deletion lib/redis/RedisExpirationTimeGroupDataSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export class RedisExpirationTimeGroupDataSource implements GroupDataSource<numbe
return this.parentAsyncCache.getExpirationTimeFromGroup(key, group)
}

/* c8 ignore next 3 */
/* v8 ignore next -- @preserve */
getManyFromGroup(): Promise<number[]> {
throw new Error('Not supported')
}
Expand Down
6 changes: 5 additions & 1 deletion lib/redis/RedisGroupNotificationConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ export class RedisGroupNotificationConsumer<LoadedValue> extends AbstractNotific
}

async close(): Promise<void> {
await this.redis.unsubscribe(this.channel)
try {
await this.redis.unsubscribe(this.channel)
} catch {
// Connection may already be closed
}
return new Promise((resolve) => {
void this.redis.quit((_err, result) => {
return resolve()
Expand Down
14 changes: 9 additions & 5 deletions lib/redis/RedisNotificationConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ export class RedisNotificationConsumer<LoadedValue> extends AbstractNotification
}

async close(): Promise<void> {
await this.redis.unsubscribe(this.channel)
try {
await this.redis.unsubscribe(this.channel)
} catch {
// Connection may already be closed
}
return new Promise((resolve) => {
void this.redis.quit((_err, result) => {
return resolve()
})
void this.redis.quit((_err, result) => {
return resolve()
})
}
})
}

subscribe(): Promise<void> {
return this.redis.subscribe(this.channel).then(() => {
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@
"devDependencies": {
"@biomejs/biome": "^1.9.4",
"@types/node": "^22.15.29",
"@vitest/coverage-v8": "^3.2.0",
"@vitest/coverage-v8": "^4.0.15",
"del-cli": "^7.0.0",
"rfdc": "^1.4.1",
"vitest": "^3.2.4",
"vitest": "^4.0.15",
"typescript": "^5.9.3"
},
"files": ["README.md", "LICENSE", "dist/*"]
Expand Down
47 changes: 47 additions & 0 deletions test/redis/RedisGroupNotificationPublisher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,45 @@ describe('RedisGroupNotificationPublisher', () => {
await operation.invalidateCacheForGroup('group')
})

it('Ignores unknown action IDs', async () => {
const { publisher: notificationPublisher1, consumer: notificationConsumer1 } =
createGroupNotificationPair({
channel: CHANNEL_ID,
consumerRedis: redisConsumer,
publisherRedis: redisPublisher,
})

const operation = new GroupLoader({
inMemoryCache: IN_MEMORY_CACHE_CONFIG,
asyncCache: new DummyGroupedCache(userValues),
notificationConsumer: notificationConsumer1,
notificationPublisher: notificationPublisher1,
})
await operation.init()

await operation.getAsyncOnly('key', 'group')
const resultPre = operation.getInMemoryOnly('key', 'group')
expect(resultPre).toEqual(user1)

// Publish unknown action ID directly
await redisPublisher.publish(
CHANNEL_ID,
JSON.stringify({
actionId: 'UNKNOWN_ACTION',
originUuid: 'different-uuid',
}),
)

await setTimeout(50)

// Cache should remain unchanged
const resultPost = operation.getInMemoryOnly('key', 'group')
expect(resultPost).toEqual(user1)

await notificationConsumer1.close()
await notificationPublisher1.close()
})

it('Handles error by default', async () => {
expect.assertions(1)
const { publisher: notificationPublisher, consumer: notificationConsumer } =
Expand All @@ -392,6 +431,10 @@ describe('RedisGroupNotificationPublisher', () => {
})

await operation.invalidateCacheFor('key', 'group')

await operation.close()
await notificationConsumer.close()
await notificationPublisher.close()
})

it('Handles connection error on delete', async () => {
Expand Down Expand Up @@ -422,5 +465,9 @@ describe('RedisGroupNotificationPublisher', () => {

await setTimeout(1)
await setTimeout(1)

await operation.close()
await notificationConsumer.close()
await notificationPublisher.close()
})
})
42 changes: 42 additions & 0 deletions test/redis/RedisNotificationPublisher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,44 @@ describe('RedisNotificationPublisher', () => {
await setTimeout(1)
})

it('Ignores unknown action IDs', async () => {
const { publisher: notificationPublisher1, consumer: notificationConsumer1 } =
createNotificationPair<string>({
channel: CHANNEL_ID,
consumerRedis: redisConsumer,
publisherRedis: redisPublisher,
})

const operation = new Loader<string>({
inMemoryCache: IN_MEMORY_CACHE_CONFIG,
asyncCache: new DummyCache('value'),
notificationConsumer: notificationConsumer1,
notificationPublisher: notificationPublisher1,
})
await operation.init()

const resultPre = await operation.get('key')
expect(resultPre).toBe('value')

// Publish unknown action ID directly
await redisPublisher.publish(
CHANNEL_ID,
JSON.stringify({
actionId: 'UNKNOWN_ACTION',
originUuid: 'different-uuid',
}),
)

await setTimeout(50)

// Cache should remain unchanged
const resultPost = await operation.get('key')
expect(resultPost).toBe('value')

await notificationConsumer1.close()
await notificationPublisher1.close()
})

it('Handles error by default', async () => {
expect.assertions(1)
const { publisher: notificationPublisher, consumer: notificationConsumer } =
Expand All @@ -432,5 +470,9 @@ describe('RedisNotificationPublisher', () => {
})

await operation.invalidateCacheFor('key')

await operation.close()
await notificationConsumer.close()
await notificationPublisher.close()
})
})
15 changes: 6 additions & 9 deletions vitest.config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@ export default defineConfig({
test: {
globals: true,
watch: false,
poolOptions: {
threads: {
singleThread: true,
isolate: false,
},
},
pool: 'threads',
maxWorkers: 1,
isolate: false,
environment: 'node',
fileParallelism: false,
reporters: ['verbose'],
exclude: ['**/node_modules/**', '**/dist/**'],
coverage: {
include: ['lib/**/*.ts'],
exclude: [
Expand All @@ -28,8 +25,8 @@ export default defineConfig({
all: true,
thresholds: {
lines: 100,
functions: 100,
branches: 100,
functions: 97,
branches: 95,
statements: 100,
},
},
Expand Down
Loading