Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions lib/AbstractGroupCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ export abstract class AbstractGroupCache<LoadedValue, LoadParams = string, LoadM
public getInMemoryOnly(loadParams: LoadParams, group: string): LoadedValue | undefined | null {
const key = this.cacheKeyFromLoadParamsResolver(loadParams)
if (this.inMemoryCache.ttlLeftBeforeRefreshInMsecs) {
const groupLoads = this.resolveGroupLoads(group)
if (!groupLoads.has(key)) {
if (!this.runningLoads.get(group)?.has(key)) {
const expirationTime = this.inMemoryCache.getExpirationTimeFromGroup(key, group)
if (expirationTime && expirationTime - Date.now() < this.inMemoryCache.ttlLeftBeforeRefreshInMsecs) {
void this.getAsyncOnly(loadParams, group)
Expand Down
3 changes: 3 additions & 0 deletions lib/GroupLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ export class GroupLoader<LoadedValue, LoadParams = string, LoadManyParams = Load
})
.finally(() => {
groupSet!.delete(key)
if (groupSet!.size === 0) {
this.groupRefreshFlags.delete(group)
}
})
}
}
Expand Down
11 changes: 2 additions & 9 deletions lib/Loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ export class Loader<LoadedValue, LoadParams = string, LoadManyParams = LoadParam

public async forceSetValue(key: string, newValue: LoadedValue | null) {
this.inMemoryCache.set(key, newValue)
/* v8 ignore next -- @preserve */
if (this.runningLoads.has(key)) {
this.runningLoads.delete(key)
}
this.runningLoads.delete(key)

if (this.asyncCache) {
await this.asyncCache.set(key, newValue).catch((err) => {
Expand All @@ -96,11 +93,7 @@ export class Loader<LoadedValue, LoadParams = string, LoadManyParams = LoadParam
return this.loadFromLoaders(key, loadParams).then((finalValue) => {
if (finalValue !== undefined) {
this.inMemoryCache.set(key, finalValue)

/* v8 ignore next -- @preserve */
if (this.runningLoads.has(key)) {
this.runningLoads.delete(key)
}
this.runningLoads.delete(key)
}

// In order to keep other cluster nodes in-sync with potentially changed entry, we force them to refresh too
Expand Down
3 changes: 2 additions & 1 deletion lib/memory/InMemoryGroupCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@ export class InMemoryGroupCache<T> implements SynchronousGroupCache<T> {
getManyFromGroup(keys: string[], group: string): GetManyResult<T> {
const resolvedValues: T[] = []
const unresolvedKeys: string[] = []
const groupCache = this.resolveGroup(group)

for (let i = 0; i < keys.length; i++) {
const resolvedValue = this.getFromGroup(keys[i], group)
const resolvedValue = groupCache.get(keys[i])
if (resolvedValue) {
resolvedValues.push(resolvedValue)
} else {
Expand Down
9 changes: 7 additions & 2 deletions lib/redis/RedisGroupNotificationConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export class RedisGroupNotificationConsumer<LoadedValue> extends AbstractNotific
> {
private readonly redis: Redis
private readonly channel: string
private messageHandler?: (channel: string, message: string) => void

constructor(redis: Redis, config: RedisConsumerConfig) {
super(config.serverUuid)
Expand All @@ -22,6 +23,9 @@ export class RedisGroupNotificationConsumer<LoadedValue> extends AbstractNotific
}

async close(): Promise<void> {
if (this.messageHandler) {
this.redis.removeListener('message', this.messageHandler)
}
try {
await this.redis.unsubscribe(this.channel)
} catch {
Expand All @@ -36,7 +40,7 @@ export class RedisGroupNotificationConsumer<LoadedValue> extends AbstractNotific

subscribe(): Promise<void> {
return this.redis.subscribe(this.channel).then(() => {
this.redis.on('message', (channel, message) => {
this.messageHandler = (channel, message) => {
const parsedMessage: GroupNotificationCommand = JSON.parse(message)
// this is a local message, ignore
if (parsedMessage.originUuid === this.serverUuid) {
Expand All @@ -57,7 +61,8 @@ export class RedisGroupNotificationConsumer<LoadedValue> extends AbstractNotific
if (parsedMessage.actionId === 'CLEAR') {
return this.targetCache.clear()
}
})
}
this.redis.on('message', this.messageHandler)
})
}
}
9 changes: 7 additions & 2 deletions lib/redis/RedisNotificationConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export class RedisNotificationConsumer<LoadedValue> extends AbstractNotification
> {
private readonly redis: Redis
private readonly channel: string
private messageHandler?: (channel: string, message: string) => void

constructor(redis: Redis, config: RedisConsumerConfig) {
super(config.serverUuid)
Expand All @@ -27,6 +28,9 @@ export class RedisNotificationConsumer<LoadedValue> extends AbstractNotification
}

async close(): Promise<void> {
if (this.messageHandler) {
this.redis.removeListener('message', this.messageHandler)
}
try {
await this.redis.unsubscribe(this.channel)
} catch {
Expand All @@ -41,7 +45,7 @@ export class RedisNotificationConsumer<LoadedValue> extends AbstractNotification

subscribe(): Promise<void> {
return this.redis.subscribe(this.channel).then(() => {
this.redis.on('message', (channel, message) => {
this.messageHandler = (channel, message) => {
const parsedMessage: NotificationCommand = JSON.parse(message)
// this is a local message, ignore
if (parsedMessage.originUuid === this.serverUuid) {
Expand All @@ -66,7 +70,8 @@ export class RedisNotificationConsumer<LoadedValue> extends AbstractNotification
(parsedMessage as SetNotificationCommand<LoadedValue>).value,
)
}
})
}
this.redis.on('message', this.messageHandler)
})
}
}
13 changes: 13 additions & 0 deletions lib/util/unique.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@ import { describe, expect, it } from 'vitest'
import { unique } from './unique'

describe('unique', () => {
it('returns the same array reference when there are no duplicates', () => {
const input = [1, 2, 3, 'a', 'b']
const result = unique(input)
expect(result).toBe(input)
})

it('returns a new array when there are duplicates', () => {
const input = [1, 2, 2, 3]
const result = unique(input)
expect(result).not.toBe(input)
expect(result).toEqual([1, 2, 3])
})

it('returns a new array of mixed primitive value without duplicates', () => {
const objectA = {}
const objectB = {}
Expand Down
5 changes: 4 additions & 1 deletion lib/util/unique.ts
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
export const unique = <T>(arr: T[]): T[] => Array.from(new Set(arr))
export const unique = <T>(arr: T[]): T[] => {
const set = new Set(arr)
return set.size === arr.length ? arr : Array.from(set)
}
28 changes: 28 additions & 0 deletions test/GroupLoader-async-refresh.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,34 @@ describe('GroupLoader Async Refresh', () => {
expect(expirationTimePost! > expirationTimePre!).toBe(true)
})

it('cleans up groupRefreshFlags after background refresh completes', async () => {
const loader = new CountingGroupedLoader(userValues)
const asyncCache = new RedisGroupCache<User>(redis, {
ttlInMsecs: 150,
json: true,
ttlLeftBeforeRefreshInMsecs: 75,
})

const operation = new GroupLoader<User>({
asyncCache,
dataSources: [loader],
})

expect(await operation.get(user1.userId, user1.companyId)).toEqual(user1)
expect(loader.counter).toBe(1)

await setTimeout(100)
// kick off the refresh
expect(await operation.get(user1.userId, user1.companyId)).toEqual(user1)
await setTimeout(5)
expect(loader.counter).toBe(2)

// @ts-ignore
const groupRefreshFlags: Map<string, Set<string>> = operation.groupRefreshFlags
// The empty Set for the group should have been cleaned up
expect(groupRefreshFlags.has(user1.companyId)).toBe(false)
})

it('async background refresh errors do not crash app', async () => {
const loader = new CountingGroupedLoader(userValues)
const asyncCache = new RedisGroupCache<User>(redis, {
Expand Down
23 changes: 23 additions & 0 deletions test/GroupLoader-main.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,29 @@ describe('GroupLoader Main', () => {
expect(resultPost).toEqual(user1)
})

it('does not create empty runningLoads entry when calling getInMemoryOnly', async () => {
const operation = new GroupLoader<User>({
inMemoryCache: {
cacheId: 'dummy',
ttlInMsecs: 150,
ttlLeftBeforeRefreshInMsecs: 75,
},
dataSources: [new CountingGroupedLoader(userValues)],
})

// Load a value first
expect(await operation.get(user1.userId, user1.companyId)).toEqual(user1)

// getInMemoryOnly should NOT create an entry in runningLoads for a group that doesn't need refresh
operation.getInMemoryOnly(user1.userId, user1.companyId)

// @ts-ignore
const runningLoads: Map<string, Map<string, unknown>> = operation.runningLoads
// Either the group doesn't exist in runningLoads, or it has no entries
const groupLoads = runningLoads.get(user1.companyId)
expect(!groupLoads || groupLoads.size === 0).toBe(true)
})

it('triggers background refresh when threshold is set and reached', async () => {
const loader = new CountingGroupedLoader(userValues)

Expand Down
24 changes: 24 additions & 0 deletions test/redis/RedisGroupNotificationPublisher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,30 @@ describe('RedisGroupNotificationPublisher', () => {
await notificationPublisher1.close()
})

it('Removes message listeners on close', async () => {
const { publisher: notificationPublisher1, consumer: notificationConsumer1 } =
createGroupNotificationPair({
channel: CHANNEL_ID,
consumerRedis: redisConsumer,
publisherRedis: redisPublisher,
})

const operation = new GroupLoader({
inMemoryCache: IN_MEMORY_CACHE_CONFIG,
asyncCache: new DummyGroupedCache(userValues),
notificationConsumer: notificationConsumer1,
notificationPublisher: notificationPublisher1,
})
await operation.init()

expect(redisConsumer.listenerCount('message')).toBeGreaterThan(0)

await notificationConsumer1.close()
await notificationPublisher1.close()

expect(redisConsumer.listenerCount('message')).toBe(0)
})

it('Handles error by default', async () => {
expect.assertions(1)
const { publisher: notificationPublisher, consumer: notificationConsumer } =
Expand Down
24 changes: 24 additions & 0 deletions test/redis/RedisNotificationPublisher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,30 @@ describe('RedisNotificationPublisher', () => {
await notificationPublisher1.close()
})

it('Removes message listeners on close', async () => {
const { publisher: notificationPublisher1, consumer: notificationConsumer1 } =
createNotificationPair<string>({
channel: CHANNEL_ID,
consumerRedis: redisConsumer,
publisherRedis: redisPublisher,
})

const operation = new Loader<string>({
inMemoryCache: IN_MEMORY_CACHE_CONFIG,
asyncCache: new DummyCache('value'),
notificationConsumer: notificationConsumer1,
notificationPublisher: notificationPublisher1,
})
await operation.init()

expect(redisConsumer.listenerCount('message')).toBeGreaterThan(0)

await notificationConsumer1.close()
await notificationPublisher1.close()

expect(redisConsumer.listenerCount('message')).toBe(0)
})

it('Handles error by default', async () => {
expect.assertions(1)
const { publisher: notificationPublisher, consumer: notificationConsumer } =
Expand Down
Loading