Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion packages/repco-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"dependencies": {
"@ipld/car": "^5.1.0",
"@noble/hashes": "^1.2.0",
"@plussub/srt-vtt-parser": "^1.1.1",
"@types/b4a": "^1.6.0",
"@ucans/ucans": "^0.11.2",
"b4a": "^1.6.1",
Expand All @@ -44,9 +45,11 @@
"@types/tempy": "^0.3.0",
"brittle": "^2.4.0",
"esbuild": "^0.14.51",
"express": "^4.18.1",
"get-port": "^6.1.2",
"nanobench-utils": "^1.0.1",
"split2": "^4.2.0",
"tempy": "^3.0.0"
"tempy": "^3.0.0",
"why-is-node-running": "^2.2.2"
}
}
41 changes: 41 additions & 0 deletions packages/repco-core/src/datasource.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { parse as parseSubtitle } from '@plussub/srt-vtt-parser'
import { EventEmitter } from 'node:events'
import { createLogger } from 'repco-common'
import { fetch } from 'undici'
import { EntityForm } from './entity.js'
import { IngestOutcome, IngestState } from './ingest.js'
import { DataSourcePluginRegistry } from './plugins.js'
Expand Down Expand Up @@ -644,9 +646,48 @@ async function persistAndMapSourceRecords(
errors.push(error)
}
}
await Promise.all(entities.map(processBeforeSave))
return { entities, errors }
}

async function processBeforeSave(entity: EntityForm): Promise<void> {
if (entity.type === 'Transcript') {
const content = entity.content
if (!content.text && content.subtitleUrl) {
const url = content.subtitleUrl
try {
content.text = await fetchSubtitle(url)
log.debug(`Fetched subtitle text for ${entity.content.uid} from ${url}`)
} catch (error) {
log.warn({
error,
mesage: `Failed to fetch subtitle for ${content.uid} from ${url}`,
})
}
}
}
}

async function fetchSubtitle(url: string) {
const timeout = 2000
const controller = new AbortController()
const id = setTimeout(() => controller.abort(), timeout)
const res = await fetch(url, { signal: controller.signal })
if (!res.ok) {
const error = `${res.status} ${res.statusText}`
clearTimeout(id)
throw new Error(`Failed to fetch: ${error}`)
}
try {
const text = await res.text()
const { entries } = parseSubtitle(text)
const fullText = entries.map(({ text }) => text).join('\n')
return fullText
} finally {
clearTimeout(id)
}
}

// Recreate all entities originating from a particular DataSource
//
// This traverses all source records for a datasource and recreates entity revisions for each.
Expand Down
164 changes: 164 additions & 0 deletions packages/repco-core/test/subtitles.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */

import test from 'brittle'
import express from 'express'
import getPort from 'get-port'
import { setup } from './util/setup.js'
import { DataSource, repoRegistry } from '../lib.js'
import {
BaseDataSource,
DataSourceDefinition,
FetchUpdatesResult,
ingestUpdatesFromDataSources,
SourceRecordForm,
} from '../src/datasource.js'
import { EntityForm } from '../src/entity.js'
import { DataSourcePlugin, DataSourcePluginRegistry } from '../src/plugins.js'

type Config = {
subtitleHost: string
}

class TestDataSourcePlugin implements DataSourcePlugin {
createInstance(config: any) {
return new TestDataSource(config as Config)
}
get definition() {
return {
name: 'test',
uid: 'ds:test',
}
}
}

class TestDataSource extends BaseDataSource implements DataSource {
public subtitleConfig: Config
get definition(): DataSourceDefinition {
return {
name: 'TestDataSource',
uid: 'ds:test:1',
pluginUid: 'ds:test',
}
}

constructor(config: Config) {
super()
this.subtitleConfig = config
}

canFetchUri(uid: string): boolean {
if (uid.startsWith('test:')) return true
return false
}

async fetchUpdates(cursorString: string | null): Promise<FetchUpdatesResult> {
const cursor = Number(cursorString)
console.log('fetchUpdates', cursor)
const records = []
if (cursor == 0) {
const media: EntityForm = {
type: 'MediaAsset',
headers: {
EntityUris: ['media1'],
},
content: {
title: 'media1',
description: 'boo',
mediaType: 'video',
},
}
const record1 = {
sourceUri: 'subtitle1',
contentType: 'text',
body: JSON.stringify(media),
sourceType: 'entity',
}
records.push(record1)
const url = this.subtitleConfig.subtitleHost + '/subtitle1.vtt'
const entity: EntityForm = {
type: 'Transcript',
content: {
text: '',
subtitleUrl: url,
author: 'asdf',
engine: 'foo',
license: 'bar',
language: 'de',
MediaAsset: {
uri: 'media1',
},
},
}
const record2 = {
sourceUri: 'subtitle1',
contentType: 'text',
body: JSON.stringify(entity),
sourceType: 'entity',
}
records.push(record2)
}

return {
cursor: '' + (cursor + 1),
records,
}
}
async fetchByUri(_uid: string): Promise<SourceRecordForm[] | null> {
throw new Error('Failed to fetch')
}

async mapSourceRecord(record: SourceRecordForm): Promise<EntityForm[]> {
const form = JSON.parse(record.body) as EntityForm
return [form]
}
}

test('subtitle fetch after ingest', async (assert) => {
console.log('start')
const subtitlePort = await getPort()
const subtitleApp = express()
const VTT = `WEBVTT
00:00:00.500 --> 00:00:02.000
The Web is always changing

00:00:02.500 --> 00:00:04.300
and the way we access it is changing`

subtitleApp.get('/subtitle1.vtt', (_req, res) => {
res.header('content-type', 'text/vtt')
res.send(VTT)
})
const subtitleServer = subtitleApp.listen(subtitlePort)
const subtitleHost = `http://localhost:${subtitlePort}`

const prisma = await setup(assert)
const repo = await repoRegistry.create(prisma, 'test')
const plugins = new DataSourcePluginRegistry()
plugins.register(new TestDataSourcePlugin())
await repo.dsr.create(
repo.prisma,
plugins,
'ds:test',
{ subtitleHost },
repo.did,
)
await ingestUpdatesFromDataSources(repo)
const uri = 'media1'
const entities = await prisma.mediaAsset.findMany({
where: { Revision: { entityUris: { has: uri } } },
include: {
Transcripts: true,
},
})
assert.is(entities.length, 1)
const media = entities[0]
assert.is(media.Transcripts.length, 1)
const transcript = media.Transcripts[0]
const expectedText = [
'The Web is always changing',
'and the way we access it is changing',
].join('\n')
assert.is(transcript.text, expectedText)
subtitleServer.closeAllConnections()
subtitleServer.close()
})
3 changes: 2 additions & 1 deletion packages/repco-core/test/util/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const defaultLogFn = (...args: any[]) =>

export async function setup(test: Test) {
const { teardown, databaseUrl } = await setupDb(defaultLogFn)
test.teardown(teardown, { order: 0 })
test.teardown(teardown, { order: 2 })
process.env.DATABASE_URL = databaseUrl
const log: Prisma.LogDefinition[] = []
if (process.env.QUERY_LOG) log.push({ emit: 'event', level: 'query' })
Expand All @@ -41,6 +41,7 @@ export async function setup(test: Test) {
test.comment(`QUERY: ${e.query} ${e.params}`)
}
})
test.teardown(() => prisma.$disconnect(), { order: 1 })
return prisma
}

Expand Down
7 changes: 6 additions & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2427,6 +2427,11 @@
resolved "https://registry.yarnpkg.com/@pkgjs/parseargs/-/parseargs-0.11.0.tgz#a77ea742fab25775145434eb1d2328cf5013ac33"
integrity sha512-+1VkjdD0QBLPodGrJUeqarH8VAIvQODIbwh9XpP5Syisf7YoQgsJKPNFoqqLQlu+VQ/tVSshMR6loPMn8U+dPg==

"@plussub/srt-vtt-parser@^1.1.1":
version "1.1.1"
resolved "https://registry.yarnpkg.com/@plussub/srt-vtt-parser/-/srt-vtt-parser-1.1.1.tgz#62103c1e57eea3422afe2bff1a296774c1dc1027"
integrity sha512-uHZLKANNVoXSEjLRtkyyDFXaiC86ZNnjPnHve5bs5RSiXV0DfVlUvXMZXzfR7X+z2OVdoLFn6oYsS2a/VkN8Ew==

"@prisma/client@^4.12.0":
version "4.16.2"
resolved "https://registry.yarnpkg.com/@prisma/client/-/client-4.16.2.tgz#3bb9ebd49b35c8236b3d468d0215192267016e2b"
Expand Down Expand Up @@ -15183,7 +15188,7 @@ which@^2.0.1:
dependencies:
isexe "^2.0.0"

why-is-node-running@^2.2.1:
why-is-node-running@^2.2.1, why-is-node-running@^2.2.2:
version "2.2.2"
resolved "https://registry.yarnpkg.com/why-is-node-running/-/why-is-node-running-2.2.2.tgz#4185b2b4699117819e7154594271e7e344c9973e"
integrity sha512-6tSwToZxTOcotxHeA+qGCq1mVzKR3CwcJGmVcY+QE8SHy6TnpFnh8PAvPNHYr7EcuVeG0QSMxtYCuO1ta/G/oA==
Expand Down