diff --git a/packages/repco-core/package.json b/packages/repco-core/package.json index 1b16dc3e..f5115c19 100644 --- a/packages/repco-core/package.json +++ b/packages/repco-core/package.json @@ -19,6 +19,7 @@ "dependencies": { "@ipld/car": "^5.1.0", "@noble/hashes": "^1.2.0", + "@plussub/srt-vtt-parser": "^1.1.1", "@types/b4a": "^1.6.0", "@ucans/ucans": "^0.11.2", "b4a": "^1.6.1", @@ -44,9 +45,11 @@ "@types/tempy": "^0.3.0", "brittle": "^2.4.0", "esbuild": "^0.14.51", + "express": "^4.18.1", "get-port": "^6.1.2", "nanobench-utils": "^1.0.1", "split2": "^4.2.0", - "tempy": "^3.0.0" + "tempy": "^3.0.0", + "why-is-node-running": "^2.2.2" } } diff --git a/packages/repco-core/src/datasource.ts b/packages/repco-core/src/datasource.ts index 67cb604b..372be0c7 100644 --- a/packages/repco-core/src/datasource.ts +++ b/packages/repco-core/src/datasource.ts @@ -1,5 +1,7 @@ +import { parse as parseSubtitle } from '@plussub/srt-vtt-parser' import { EventEmitter } from 'node:events' import { createLogger } from 'repco-common' +import { fetch } from 'undici' import { EntityForm } from './entity.js' import { IngestOutcome, IngestState } from './ingest.js' import { DataSourcePluginRegistry } from './plugins.js' @@ -644,9 +646,48 @@ async function persistAndMapSourceRecords( errors.push(error) } } + await Promise.all(entities.map(processBeforeSave)) return { entities, errors } } +async function processBeforeSave(entity: EntityForm): Promise { + if (entity.type === 'Transcript') { + const content = entity.content + if (!content.text && content.subtitleUrl) { + const url = content.subtitleUrl + try { + content.text = await fetchSubtitle(url) + log.debug(`Fetched subtitle text for ${entity.content.uid} from ${url}`) + } catch (error) { + log.warn({ + error, + mesage: `Failed to fetch subtitle for ${content.uid} from ${url}`, + }) + } + } + } +} + +async function fetchSubtitle(url: string) { + const timeout = 2000 + const controller = new AbortController() + const id = setTimeout(() => controller.abort(), timeout) + const res = await fetch(url, { signal: controller.signal }) + if (!res.ok) { + const error = `${res.status} ${res.statusText}` + clearTimeout(id) + throw new Error(`Failed to fetch: ${error}`) + } + try { + const text = await res.text() + const { entries } = parseSubtitle(text) + const fullText = entries.map(({ text }) => text).join('\n') + return fullText + } finally { + clearTimeout(id) + } +} + // Recreate all entities originating from a particular DataSource // // This traverses all source records for a datasource and recreates entity revisions for each. diff --git a/packages/repco-core/test/subtitles.ts b/packages/repco-core/test/subtitles.ts new file mode 100644 index 00000000..ffd9a05b --- /dev/null +++ b/packages/repco-core/test/subtitles.ts @@ -0,0 +1,164 @@ +/* eslint-disable @typescript-eslint/no-non-null-assertion */ + +import test from 'brittle' +import express from 'express' +import getPort from 'get-port' +import { setup } from './util/setup.js' +import { DataSource, repoRegistry } from '../lib.js' +import { + BaseDataSource, + DataSourceDefinition, + FetchUpdatesResult, + ingestUpdatesFromDataSources, + SourceRecordForm, +} from '../src/datasource.js' +import { EntityForm } from '../src/entity.js' +import { DataSourcePlugin, DataSourcePluginRegistry } from '../src/plugins.js' + +type Config = { + subtitleHost: string +} + +class TestDataSourcePlugin implements DataSourcePlugin { + createInstance(config: any) { + return new TestDataSource(config as Config) + } + get definition() { + return { + name: 'test', + uid: 'ds:test', + } + } +} + +class TestDataSource extends BaseDataSource implements DataSource { + public subtitleConfig: Config + get definition(): DataSourceDefinition { + return { + name: 'TestDataSource', + uid: 'ds:test:1', + pluginUid: 'ds:test', + } + } + + constructor(config: Config) { + super() + this.subtitleConfig = config + } + + canFetchUri(uid: string): boolean { + if (uid.startsWith('test:')) return true + return false + } + + async fetchUpdates(cursorString: string | null): Promise { + const cursor = Number(cursorString) + console.log('fetchUpdates', cursor) + const records = [] + if (cursor == 0) { + const media: EntityForm = { + type: 'MediaAsset', + headers: { + EntityUris: ['media1'], + }, + content: { + title: 'media1', + description: 'boo', + mediaType: 'video', + }, + } + const record1 = { + sourceUri: 'subtitle1', + contentType: 'text', + body: JSON.stringify(media), + sourceType: 'entity', + } + records.push(record1) + const url = this.subtitleConfig.subtitleHost + '/subtitle1.vtt' + const entity: EntityForm = { + type: 'Transcript', + content: { + text: '', + subtitleUrl: url, + author: 'asdf', + engine: 'foo', + license: 'bar', + language: 'de', + MediaAsset: { + uri: 'media1', + }, + }, + } + const record2 = { + sourceUri: 'subtitle1', + contentType: 'text', + body: JSON.stringify(entity), + sourceType: 'entity', + } + records.push(record2) + } + + return { + cursor: '' + (cursor + 1), + records, + } + } + async fetchByUri(_uid: string): Promise { + throw new Error('Failed to fetch') + } + + async mapSourceRecord(record: SourceRecordForm): Promise { + const form = JSON.parse(record.body) as EntityForm + return [form] + } +} + +test('subtitle fetch after ingest', async (assert) => { + console.log('start') + const subtitlePort = await getPort() + const subtitleApp = express() + const VTT = `WEBVTT +00:00:00.500 --> 00:00:02.000 +The Web is always changing + +00:00:02.500 --> 00:00:04.300 +and the way we access it is changing` + + subtitleApp.get('/subtitle1.vtt', (_req, res) => { + res.header('content-type', 'text/vtt') + res.send(VTT) + }) + const subtitleServer = subtitleApp.listen(subtitlePort) + const subtitleHost = `http://localhost:${subtitlePort}` + + const prisma = await setup(assert) + const repo = await repoRegistry.create(prisma, 'test') + const plugins = new DataSourcePluginRegistry() + plugins.register(new TestDataSourcePlugin()) + await repo.dsr.create( + repo.prisma, + plugins, + 'ds:test', + { subtitleHost }, + repo.did, + ) + await ingestUpdatesFromDataSources(repo) + const uri = 'media1' + const entities = await prisma.mediaAsset.findMany({ + where: { Revision: { entityUris: { has: uri } } }, + include: { + Transcripts: true, + }, + }) + assert.is(entities.length, 1) + const media = entities[0] + assert.is(media.Transcripts.length, 1) + const transcript = media.Transcripts[0] + const expectedText = [ + 'The Web is always changing', + 'and the way we access it is changing', + ].join('\n') + assert.is(transcript.text, expectedText) + subtitleServer.closeAllConnections() + subtitleServer.close() +}) diff --git a/packages/repco-core/test/util/setup.ts b/packages/repco-core/test/util/setup.ts index bcf5e9b1..199f0c13 100644 --- a/packages/repco-core/test/util/setup.ts +++ b/packages/repco-core/test/util/setup.ts @@ -25,7 +25,7 @@ const defaultLogFn = (...args: any[]) => export async function setup(test: Test) { const { teardown, databaseUrl } = await setupDb(defaultLogFn) - test.teardown(teardown, { order: 0 }) + test.teardown(teardown, { order: 2 }) process.env.DATABASE_URL = databaseUrl const log: Prisma.LogDefinition[] = [] if (process.env.QUERY_LOG) log.push({ emit: 'event', level: 'query' }) @@ -41,6 +41,7 @@ export async function setup(test: Test) { test.comment(`QUERY: ${e.query} ${e.params}`) } }) + test.teardown(() => prisma.$disconnect(), { order: 1 }) return prisma } diff --git a/yarn.lock b/yarn.lock index df048ddc..e4c5429f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2427,6 +2427,11 @@ resolved "https://registry.yarnpkg.com/@pkgjs/parseargs/-/parseargs-0.11.0.tgz#a77ea742fab25775145434eb1d2328cf5013ac33" integrity sha512-+1VkjdD0QBLPodGrJUeqarH8VAIvQODIbwh9XpP5Syisf7YoQgsJKPNFoqqLQlu+VQ/tVSshMR6loPMn8U+dPg== +"@plussub/srt-vtt-parser@^1.1.1": + version "1.1.1" + resolved "https://registry.yarnpkg.com/@plussub/srt-vtt-parser/-/srt-vtt-parser-1.1.1.tgz#62103c1e57eea3422afe2bff1a296774c1dc1027" + integrity sha512-uHZLKANNVoXSEjLRtkyyDFXaiC86ZNnjPnHve5bs5RSiXV0DfVlUvXMZXzfR7X+z2OVdoLFn6oYsS2a/VkN8Ew== + "@prisma/client@^4.12.0": version "4.16.2" resolved "https://registry.yarnpkg.com/@prisma/client/-/client-4.16.2.tgz#3bb9ebd49b35c8236b3d468d0215192267016e2b" @@ -15183,7 +15188,7 @@ which@^2.0.1: dependencies: isexe "^2.0.0" -why-is-node-running@^2.2.1: +why-is-node-running@^2.2.1, why-is-node-running@^2.2.2: version "2.2.2" resolved "https://registry.yarnpkg.com/why-is-node-running/-/why-is-node-running-2.2.2.tgz#4185b2b4699117819e7154594271e7e344c9973e" integrity sha512-6tSwToZxTOcotxHeA+qGCq1mVzKR3CwcJGmVcY+QE8SHy6TnpFnh8PAvPNHYr7EcuVeG0QSMxtYCuO1ta/G/oA==