Skip to content
1,803 changes: 1,661 additions & 142 deletions servers/cu/package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions servers/cu/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"test": "node --experimental-wasm-memory64 --test"
},
"dependencies": {
"@aws-sdk/client-s3": "^3.740.0",
"@fastify/middie": "^9.0.3",
"@permaweb/ao-loader": "^0.0.44",
"@permaweb/ao-scheduler-utils": "^0.0.25",
Expand Down
11 changes: 9 additions & 2 deletions servers/cu/src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ export const server = pipeP(
logger('Checkpoint Interval Reached. Attempting to Checkpoint all Processes currently in WASM heap cache...')
await domain.apis.checkpointWasmMemoryCache().toPromise()
logger('Interval Checkpoint Done. Done checkpointing all processes in WASM heap cache.')
logger('Evaluations dump interval reached. Attempting to dump all evaluations...')
await domain.apis.dumpEvaluations().toPromise()
logger('Evaluations dump done.')
}, config.PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL)
cacheCheckpointInterval.unref()
}
Expand All @@ -58,14 +61,18 @@ export const server = pipeP(

logger('Received SIGTERM. Attempting to Checkpoint all Processes currently in WASM heap cache...')
await domain.apis.checkpointWasmMemoryCache().toPromise()
logger('Done checkpointing all processes in WASM heap cache. Exiting...')
logger('Done checkpointing all processes in WASM heap cache. Attempting to dump all evaluations...')
await domain.apis.dumpEvaluations().toPromise()
logger('Done dumping all evaluations. Exiting...')
process.exit()
})

process.on('SIGUSR2', async () => {
logger('Received SIGUSR2. Manually Attempting to Checkpoint all Processes currently in WASM heap cache...')
await domain.apis.checkpointWasmMemoryCache().toPromise()
logger('SIGUSR2 Done. Done checkpointing all processes in WASM heap cache.')
logger('Done checkpointing all processes in WASM heap cache. Attempting to dump all evaluations...')
await domain.apis.dumpEvaluations().toPromise()
logger('SIGUSR2 Done. Done checkpointing all processes in WASM heap cache and dumping all evaluations.')
})

process.on('uncaughtException', (err) => {
Expand Down
71 changes: 64 additions & 7 deletions servers/cu/src/bootstrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ export const createApis = async (ctx) => {
const onCreateWorker = (type) => () => {
const workerId = randomBytes(8).toString('hex')
ctx.logger('Spinning up "%s" pool worker with id "%s"...', type, workerId)

return {
workerThreadOpts: {
workerData: {
Expand All @@ -104,7 +103,13 @@ export const createApis = async (ctx) => {
MODE: ctx.MODE,
LOG_CONFIG_PATH: ctx.LOG_CONFIG_PATH,
DEFAULT_LOG_LEVEL: ctx.DEFAULT_LOG_LEVEL,
DISABLE_PROCESS_EVALUATION_CACHE: ctx.DISABLE_PROCESS_EVALUATION_CACHE
DISABLE_PROCESS_EVALUATION_CACHE: ctx.DISABLE_PROCESS_EVALUATION_CACHE,
EVALUATION_RESULT_DIR: ctx.EVALUATION_RESULT_DIR,
EVALUATION_RESULT_BUCKET: ctx.EVALUATION_RESULT_BUCKET,
AWS_ACCESS_KEY_ID: ctx.AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY: ctx.AWS_SECRET_ACCESS_KEY,
AWS_REGION: ctx.AWS_REGION,
__dirname
}
}
}
Expand Down Expand Up @@ -163,6 +168,14 @@ export const createApis = async (ctx) => {
})
const dryRunWorkQueue = new PQueue({ concurrency: maxDryRunWorkerTheads })

const loadEvaluationWorker = join(__dirname, 'effects', 'worker', 'loadEvaluation', 'index.js')
const loadEvaluationWorkerPool = workerpool.pool(loadEvaluationWorker, {
minWorkers: (ctx.EVALUATION_RESULT_BUCKET && ctx.EVALUATION_RESULT_DIR) ? 1 : 0,
maxWorkers: (ctx.EVALUATION_RESULT_BUCKET && ctx.EVALUATION_RESULT_DIR) ? 2 : 0,
onCreateWorker: onCreateWorker('loadEvaluation')
})
const loadEvaluationWorkQueue = new PQueue({ concurrency: 2 })

const arweave = ArweaveClient.createWalletClient()
const address = ArweaveClient.addressWith({ WALLET: ctx.WALLET, arweave })

Expand Down Expand Up @@ -257,6 +270,14 @@ export const createApis = async (ctx) => {
cache: WasmClient.createWasmModuleCache({ MAX_SIZE: ctx.WASM_MODULE_CACHE_MAX_SIZE })
})

const loadEvaluation = (args) => loadEvaluationWorkQueue.add(() =>
Promise.resolve()
.then(() => loadEvaluationWorkerPool.exec('loadEvaluation', [args]))
.catch((e) => {
throw new Error(`Error in loadEvaluation worker: ${e}`)
})
)

const stats = statsWith({
gauge,
loadWorkerStats: () => ({
Expand All @@ -280,6 +301,10 @@ export const createApis = async (ctx) => {
*/
pendingTasks: dryRunWorkQueue.size
})
// hydrator: ({
// ...hydratorWorkerPool.stats(),
// pendingTasks: hydratorWorkQueue.size
// })
}),
/**
* https://nodejs.org/api/process.html#processmemoryusage
Expand Down Expand Up @@ -344,8 +369,30 @@ export const createApis = async (ctx) => {
evaluationCounter,
// gasCounter,
saveProcess: AoProcessClient.saveProcessWith({ db, logger }),
findEvaluation: AoEvaluationClient.findEvaluationWith({ db, logger }),
saveEvaluation: AoEvaluationClient.saveEvaluationWith({ db, logger }),
findEvaluation: AoEvaluationClient.findEvaluationWith({
db,
logger,
EVALUATION_RESULT_DIR: ctx.EVALUATION_RESULT_DIR,
EVALUATION_RESULT_BUCKET: ctx.EVALUATION_RESULT_BUCKET,
loadEvaluation: (args) => {
return loadEvaluationWorkQueue.add(() =>
Promise.resolve()
.then(() => {
return loadEvaluationWorkerPool.exec('loadEvaluation', [args])
})
.catch((e) => {
console.error('Error in loadEvaluation worker', e)
throw e
})
)
}
}),
saveEvaluation: AoEvaluationClient.saveEvaluationWith({
db,
logger,
EVALUATION_RESULT_DIR: ctx.EVALUATION_RESULT_DIR,
EVALUATION_RESULT_BUCKET: ctx.EVALUATION_RESULT_BUCKET
}),
findBlocks: AoBlockClient.findBlocksWith({ db, logger }),
saveBlocks: AoBlockClient.saveBlocksWith({ db, logger }),
loadBlocksMeta: AoBlockClient.loadBlocksMetaWith({ fetch: ctx.fetch, GRAPHQL_URLS: BLOCK_GRAPHQL_ARRAY, pageSize: 90, logger }),
Expand Down Expand Up @@ -467,13 +514,13 @@ export const createApis = async (ctx) => {
const readCronResultsLogger = ctx.logger.child('readCronResults')
const readCronResults = readCronResultsWith({
...sharedDeps(readCronResultsLogger),
findEvaluations: AoEvaluationClient.findEvaluationsWith({ db, logger: readCronResultsLogger })
findEvaluations: AoEvaluationClient.findEvaluationsWith({ db, logger: readCronResultsLogger, loadEvaluation, EVALUATION_RESULT_DIR: ctx.EVALUATION_RESULT_DIR, EVALUATION_RESULT_BUCKET: ctx.EVALUATION_RESULT_BUCKET })
})

const readResultsLogger = ctx.logger.child('readResults')
const readResults = readResultsWith({
...sharedDeps(readResultsLogger),
findEvaluations: AoEvaluationClient.findEvaluationsWith({ db, logger: readResultsLogger })
findEvaluations: AoEvaluationClient.findEvaluationsWith({ db, logger: readResultsLogger, loadEvaluation, EVALUATION_RESULT_DIR: ctx.EVALUATION_RESULT_DIR, EVALUATION_RESULT_BUCKET: ctx.EVALUATION_RESULT_BUCKET })
})

let checkpointP
Expand Down Expand Up @@ -529,5 +576,15 @@ export const createApis = async (ctx) => {

const healthcheck = healthcheckWith({ walletAddress: address })

return { metrics, stats, pendingReadStates, readState, dryRun, readResult, readResults, readCronResults, checkpointWasmMemoryCache, healthcheck }
const dumpEvaluations = fromPromise(async (args) => loadEvaluationWorkQueue.add(() =>
Promise.resolve()
.then(() => loadEvaluationWorkerPool.exec('dumpEvaluations', [args]))
.catch((e) => {
ctx.logger('Error in loadEvaluation worker while dumping evaluations', e)
throw new Error(`Error in loadEvaluation worker: ${e}`)
})
)
)

return { metrics, stats, pendingReadStates, readState, dryRun, readResult, readResults, readCronResults, checkpointWasmMemoryCache, healthcheck, dumpEvaluations }
}
14 changes: 12 additions & 2 deletions servers/cu/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,12 @@ const CONFIG_ENVS = {
BUSY_THRESHOLD: process.env.BUSY_THRESHOLD || 0, // disabled
RESTRICT_PROCESSES: process.env.RESTRICT_PROCESSES || [],
ALLOW_PROCESSES: process.env.ALLOW_PROCESSES || [],
ALLOW_OWNERS: process.env.ALLOW_OWNERS || []
ALLOW_OWNERS: process.env.ALLOW_OWNERS || [],
EVALUATION_RESULT_DIR: process.env.EVALUATION_RESULT_DIR,
EVALUATION_RESULT_BUCKET: process.env.EVALUATION_RESULT_BUCKET,
AWS_ACCESS_KEY_ID: process.env.AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY: process.env.AWS_SECRET_ACCESS_KEY,
AWS_REGION: process.env.AWS_REGION
},
production: {
MODE,
Expand Down Expand Up @@ -224,7 +229,12 @@ const CONFIG_ENVS = {
BUSY_THRESHOLD: process.env.BUSY_THRESHOLD || 0, // disabled
RESTRICT_PROCESSES: process.env.RESTRICT_PROCESSES || [],
ALLOW_PROCESSES: process.env.ALLOW_PROCESSES || [],
ALLOW_OWNERS: process.env.ALLOW_OWNERS || []
ALLOW_OWNERS: process.env.ALLOW_OWNERS || [],
EVALUATION_RESULT_DIR: process.env.EVALUATION_RESULT_DIR,
EVALUATION_RESULT_BUCKET: process.env.EVALUATION_RESULT_BUCKET,
AWS_ACCESS_KEY_ID: process.env.AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY: process.env.AWS_SECRET_ACCESS_KEY,
AWS_REGION: process.env.AWS_REGION
}
}

Expand Down
3 changes: 2 additions & 1 deletion servers/cu/src/domain/dal.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ export const findEvaluationSchema = z.function()
processId: z.string(),
to: z.coerce.number().nullish(),
ordinate: z.coerce.string().nullish(),
cron: z.string().nullish()
cron: z.string().nullish(),
messageId: z.string().nullish()
}))
.returns(z.promise(evaluationSchema))

Expand Down
3 changes: 2 additions & 1 deletion servers/cu/src/domain/lib/chainEvaluation.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ function loadLatestEvaluationWith ({ findEvaluation, findLatestProcessMemory, lo
processId: ctx.id,
to: ctx.to,
ordinate: ctx.ordinate,
cron: ctx.cron
cron: ctx.cron,
messageId: ctx.messageId
})
.map((evaluation) => {
logger(
Expand Down
2 changes: 1 addition & 1 deletion servers/cu/src/domain/lib/gatherResults.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export function gatherResultsWith (env) {
return findEvaluations({
processId: ctx.processId,
from: ctx.from,
to: ctx.to,
to: ctx.to ?? {},
sort,
limit: ctx.limit,
onlyCron: ctx.onlyCron
Expand Down
3 changes: 2 additions & 1 deletion servers/cu/src/domain/lib/loadProcess.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ function loadLatestEvaluationWith ({ findEvaluation, findLatestProcessMemory, sa
processId: ctx.id,
to: ctx.to,
ordinate: ctx.ordinate,
cron: ctx.cron
cron: ctx.cron,
messageId: ctx.messageId
})
.map((evaluation) => {
logger(
Expand Down
42 changes: 32 additions & 10 deletions servers/cu/src/domain/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,27 @@ export const domainConfigSchema = z.object({
* A list of wallets whose processes the CU should exclusively allow
* aka. whitelist of processes created by these wallets
*/
ALLOW_OWNERS: commaDelimitedArraySchema
ALLOW_OWNERS: commaDelimitedArraySchema,
/**
* The directory to store evaluation results
*/
EVALUATION_RESULT_DIR: z.string().optional(),
/**
* The bucket to store evaluation results
*/
EVALUATION_RESULT_BUCKET: z.string().optional(),
/**
* The access key id for the bucket to store evaluation results
*/
AWS_ACCESS_KEY_ID: z.string().optional(),
/**
* The secret access key for the bucket to store evaluation results
*/
AWS_SECRET_ACCESS_KEY: z.string().optional(),
/**
* The region of the bucket to store evaluation results
*/
AWS_REGION: z.string().optional()
})

export const bufferSchema = z.any().refine(buffer => {
Expand Down Expand Up @@ -387,6 +407,16 @@ export const scheduleSchema = z.object({
message: z.any()
})

export const outputSchema = z.object({
Memory: z.any().nullish(),
Messages: z.array(z.any()).nullish(),
Assignments: z.array(z.any()).nullish(),
Spawns: z.array(z.any()).nullish(),
Output: z.any().nullish(),
GasUsed: z.number().nullish(),
Error: z.any().nullish()
})

export const evaluationSchema = z.object({
/**
* the id of the process that the message was performed upon
Expand Down Expand Up @@ -448,13 +478,5 @@ export const evaluationSchema = z.object({
*
* This is the output of process, after the action was applied
*/
output: z.object({
Memory: z.any().nullish(),
Messages: z.array(z.any()).nullish(),
Assignments: z.array(z.any()).nullish(),
Spawns: z.array(z.any()).nullish(),
Output: z.any().nullish(),
GasUsed: z.number().nullish(),
Error: z.any().nullish()
})
output: outputSchema
})
Loading