Skip to content
Open

Dev #24

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 23 additions & 86 deletions packages/core/src/event/event-scheduler.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { PriorityQueue } from '@axrone/utility';

declare const __taskBrand: unique symbol;
declare const __schedulerBrand: unique symbol;

Expand Down Expand Up @@ -70,80 +72,7 @@ interface ITask<T = unknown> {
promise?: Promise<T>;
}

interface IPriorityBucket<T> {
tasks: T[];
head: number;
tail: number;
size: number;
}

class PriorityTaskQueue<T = ITask<any>> {
private readonly buckets: IPriorityBucket<T>[] = [];
private readonly bucketCapacity: number;
private totalSize = 0;

constructor(bucketCapacity = 256) {
this.bucketCapacity = bucketCapacity;

for (let i = 0; i <= TaskPriority.IDLE; i++) {
this.buckets[i] = {
tasks: new Array(bucketCapacity),
head: 0,
tail: 0,
size: 0,
};
}
}

enqueue(task: T, priority: TaskPriority): boolean {
const bucket = this.buckets[priority];

if (bucket.size >= this.bucketCapacity) {
return false;
}

bucket.tasks[bucket.tail] = task;
bucket.tail = (bucket.tail + 1) % this.bucketCapacity;
bucket.size++;
this.totalSize++;

return true;
}

dequeue(): T | null {
for (let priority = TaskPriority.IMMEDIATE; priority <= TaskPriority.IDLE; priority++) {
const bucket = this.buckets[priority];

if (bucket.size > 0) {
const task = bucket.tasks[bucket.head];
bucket.head = (bucket.head + 1) % this.bucketCapacity;
bucket.size--;
this.totalSize--;

return task;
}
}

return null;
}

get size(): number {
return this.totalSize;
}

clear(): void {
for (const bucket of this.buckets) {
bucket.head = 0;
bucket.tail = 0;
bucket.size = 0;
}
this.totalSize = 0;
}

getSizeByPriority(priority: TaskPriority): number {
return this.buckets[priority].size;
}
}

export class EventScheduler {
private readonly id: SchedulerId;
Expand All @@ -157,7 +86,7 @@ export class EventScheduler {
private readonly gcIntervalMs: number;
private readonly name: string;

private readonly taskQueue = new PriorityTaskQueue<ITask<any>>();
private readonly taskQueue = new PriorityQueue<ITask<any>, TaskPriority>();
private readonly activeTasks = new Map<TaskId, ITask<any>>();
private readonly taskMetrics = new Map<TaskId, ITaskMetrics>();

Expand All @@ -173,7 +102,7 @@ export class EventScheduler {

constructor(options: ISchedulerOptions = {}) {
this.id =
`scheduler_${Date.now()}_${Math.random().toString(36).substr(2, 9)}` as SchedulerId;
`scheduler_${Date.now()}_${Math.random().toString(36).substring(2, 11)}` as SchedulerId;
this.concurrencyLimit = Math.max(1, options.concurrencyLimit ?? Infinity);
this.maxQueueSize = Math.max(1, options.maxQueueSize ?? 10000);
this.enableMetrics = options.enableMetrics ?? true;
Expand All @@ -194,11 +123,11 @@ export class EventScheduler {
}

get queuedCount(): number {
return this.taskQueue.size;
return this.taskQueue.size as unknown as number;
}

get isAtCapacity(): boolean {
return this.taskQueue.size >= this.maxQueueSize;
return (this.taskQueue.size as unknown as number) >= this.maxQueueSize;
}

get disposed(): boolean {
Expand Down Expand Up @@ -255,7 +184,9 @@ export class EventScheduler {
});
}

if (!this.taskQueue.enqueue(task, priority)) {
try {
this.taskQueue.enqueue(task, priority);
} catch (error) {
_reject(new Error('Failed to enqueue task'));
return promise;
}
Expand Down Expand Up @@ -350,11 +281,13 @@ export class EventScheduler {
} catch (error) {}
});

let task: ITask | null;
while ((task = this.taskQueue.dequeue()) !== null) {
try {
task.reject(new Error('Scheduler disposed'));
} catch (error) {}
while (!this.taskQueue.isEmpty) {
const task = this.taskQueue.tryDequeue();
if (task) {
try {
task.reject(new Error('Scheduler disposed'));
} catch (error) {}
}
}

this.activeTasks.clear();
Expand All @@ -373,7 +306,7 @@ export class EventScheduler {
if (this.isDisposed) return;

while (this.activeCount < this.concurrencyLimit) {
const task = this.taskQueue.dequeue();
const task = this.taskQueue.tryDequeue();
if (!task) break;

this.executeTask(task);
Expand Down Expand Up @@ -454,8 +387,12 @@ export class EventScheduler {

setTimeout(() => {
if (!this.isDisposed && !this.isAtCapacity) {
this.taskQueue.enqueue(task, task.priority);
this.processQueue();
try {
this.taskQueue.enqueue(task, task.priority);
this.processQueue();
} catch {
task.reject(error);
}
} else {
task.reject(error);
}
Expand Down
Loading