Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,36 @@
# codextest
这是一个由codex生成的项目。

这是一个由Codex生成的示例项目,包含一个简单的 "video_matcher" 包,用于在较长视频中寻找短视频片段的位置。

## 功能
- 使用 OpenCV 提取视频帧的灰度均值作为特征。
- 通过动态规划实现的动态时间规整(DTW)来比较特征序列。
- 提供命令行工具 `video_matcher.cli`,可以在两个视频之间寻找最佳匹配位置。

## 安装依赖
示例代码依赖以下第三方库:
- `opencv-python`
- `SpeechRecognition`(可选,用于语音转文字)

在实际使用前请确保环境中已安装这些库,例如:

```bash
pip install opencv-python SpeechRecognition
```

## 使用方法
```
python -m video_matcher.cli path/to/long.mp4 path/to/short.mp4 --stride 5 --window 10
```
输出示例:
```
Extracting features from long video...
Extracting features from short video...
Matching videos...
Best match starts at frame 123 with cost 456.7
```

该输出表示在长视频的第 123 帧附近找到与短视频最相似的片段,匹配代价为 456.7(值越小越相似)。

## 限制
此项目仅为演示用途,真实场景可能需要更复杂的特征和算法,例如关键帧匹配、语音识别或深度学习特征等,用户可根据需要自行扩展。
11 changes: 11 additions & 0 deletions video_matcher/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""Video matcher package."""

from .frame_extractor import extract_frame_features
from .matching import find_best_match
from .audio_transcriber import transcribe_audio

__all__ = [
"extract_frame_features",
"find_best_match",
"transcribe_audio",
]
27 changes: 27 additions & 0 deletions video_matcher/audio_transcriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Audio transcription utilities."""

from typing import List
import speech_recognition as sr


def transcribe_audio(audio_path: str) -> List[str]:
"""Transcribe an audio file into a list of sentences.

Parameters
----------
audio_path:
Path to an audio file extracted from a video.

Returns
-------
List[str]
Sentences recognized from the audio.
"""
recognizer = sr.Recognizer()
with sr.AudioFile(audio_path) as source:
audio = recognizer.record(source)
try:
text = recognizer.recognize_google(audio, language="zh-CN")
except sr.UnknownValueError:
text = ""
return text.split()
31 changes: 31 additions & 0 deletions video_matcher/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Command line interface for video matching."""

import argparse
from pathlib import Path
from . import extract_frame_features, find_best_match


def main(argv=None):
parser = argparse.ArgumentParser(description="Match a short video to a long video")
parser.add_argument("long_video", type=Path, help="Path to the long video")
parser.add_argument("short_video", type=Path, help="Path to the short video")
parser.add_argument("--stride", type=int, default=1, help="Frame stride for feature extraction")
parser.add_argument("--window", type=int, default=10, help="Search window size")

args = parser.parse_args(argv)

print("Extracting features from long video...")
long_features = extract_frame_features(str(args.long_video), stride=args.stride)
print("Extracting features from short video...")
short_features = extract_frame_features(str(args.short_video), stride=args.stride)

print("Matching videos...")
index, cost = find_best_match(long_features, short_features, window=args.window)
if index >= 0:
print(f"Best match starts at frame {index} with cost {cost}")
else:
print("No match found")


if __name__ == "__main__":
main()
38 changes: 38 additions & 0 deletions video_matcher/frame_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Utilities for extracting frame features from videos."""

from typing import List
import cv2


def extract_frame_features(video_path: str, stride: int = 1) -> List[float]:
"""Extract simple frame features from ``video_path``.

Parameters
----------
video_path:
Path to the video file.
stride:
Interval between frames. Defaults to 1 (use every frame).

Returns
-------
List[float]
Sequence of grayscale average intensities for each sampled frame.
"""
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise IOError(f"Cannot open video: {video_path}")

features = []
index = 0
while True:
ret, frame = cap.read()
if not ret:
break
if index % stride == 0:
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
features.append(float(gray.mean()))
index += 1

cap.release()
return features
79 changes: 79 additions & 0 deletions video_matcher/matching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Algorithms for matching two sequences of frame features."""

from typing import List, Tuple


def dynamic_time_warping(seq_a: List[float], seq_b: List[float]) -> Tuple[float, List[Tuple[int, int]]]:
"""Compute DTW distance between two sequences.

Parameters
----------
seq_a, seq_b:
Feature sequences.

Returns
-------
Tuple containing the DTW cost and the alignment path.
"""
n = len(seq_a)
m = len(seq_b)
# Initialize cost matrix
cost = [[float('inf')] * (m + 1) for _ in range(n + 1)]
cost[0][0] = 0.0
# Compute dynamic programming table
for i in range(1, n + 1):
for j in range(1, m + 1):
diff = seq_a[i - 1] - seq_b[j - 1]
val = diff * diff
cost[i][j] = val + min(
cost[i - 1][j], # insertion
cost[i][j - 1], # deletion
cost[i - 1][j - 1] # match
)
# Backtrack to find path
i, j = n, m
path = []
while i > 0 and j > 0:
path.append((i - 1, j - 1))
diag = cost[i - 1][j - 1]
left = cost[i][j - 1]
up = cost[i - 1][j]
if diag <= left and diag <= up:
i -= 1
j -= 1
elif left < up:
j -= 1
else:
i -= 1
path.reverse()
return cost[n][m], path


def find_best_match(long_seq: List[float], short_seq: List[float], window: int = 10) -> Tuple[int, float]:
"""Find the index in ``long_seq`` that best matches ``short_seq``.

Parameters
----------
long_seq:
Feature sequence from the longer video.
short_seq:
Feature sequence from the shorter video.
window:
Search window size.

Returns
-------
Tuple[int, float]
Start index of the best match and the matching cost.
"""
best_index = -1
best_cost = float('inf')
for start in range(0, max(1, len(long_seq) - len(short_seq) + 1), window):
segment = long_seq[start : start + len(short_seq)]
if len(segment) < len(short_seq):
break
cost, _ = dynamic_time_warping(segment, short_seq)
if cost < best_cost:
best_cost = cost
best_index = start
return best_index, best_cost