diff --git a/bmf/modules/realtime_delay_filter.py b/bmf/modules/realtime_delay_filter.py new file mode 100644 index 00000000..bfdc31f3 --- /dev/null +++ b/bmf/modules/realtime_delay_filter.py @@ -0,0 +1,93 @@ +from bmf import Module, Log, LogLevel, InputType, ProcessResult, Packet, Timestamp, \ + VideoFrame, AudioFrame # 导入 VideoFrame 和 AudioFrame 只是为了示例,实际逻辑中不直接使用 +import time +import sys + +class RealtimeDelayFilter(Module): + ''' + BMF Python 模块,模拟 FFmpeg 的 -re 功能, + 根据数据包的时间戳控制发送速率,实现实时读取。 + ''' + def __init__(self, node, option=None): + ''' + 模块初始化方法 + Args: + node: BMF 节点对象 + option: 模块选项,可以通过图配置传递参数 + ''' + self.node_ = node + self.option_ = option + Log.log_node(LogLevel.INFO, self.node_, f"RealtimeReadModule initialized with option: {option}") + + # 初始化时间相关变量 + self._start_real_time = None # 记录处理第一个数据包时的实际系统时间 + self._first_packet_timestamp = Timestamp.UNSET # 记录第一个数据包的时间戳 (微秒单位) + + def process(self, task): + ''' + 模块处理方法,BMF 引擎会周期性调用此方法来处理输入数据 + Args: + task: BMF 任务对象,包含输入和输出队列 + Returns: + ProcessResult: 模块处理结果状态 + ''' + # 遍历任务的所有输入队列 + # 对于模拟 -re 的模块,通常只有一个输入流 + for (input_id, input_packets) in task.get_inputs().items(): + + # 获取与输入队列对应的输出队列 + output_packets = task.get_outputs()[input_id] + + # 处理输入队列中的所有数据包 + while not input_packets.empty(): + # 从输入队列获取一个数据包 + pkt = input_packets.get() + + # 处理 EOS (End of Stream) 数据包 + if pkt.timestamp == Timestamp.EOF: + Log.log_node(LogLevel.DEBUG, self.node_, f"Receive EOF on input {input_id}") + # 将 EOS 包发送到输出队列 + output_packets.put(Packet.generate_eof_packet()) + # 设置任务状态为 DONE,通知 BMF 引擎此任务已完成 + task.timestamp = Timestamp.DONE + # 返回 OK,表示任务处理顺利结束 + return ProcessResult.OK + + # 处理有效的数据包 (非 EOF 且时间戳有效) + if pkt.defined() and pkt.timestamp != Timestamp.UNSET: + current_pkt_timestamp = pkt.timestamp # 数据包的时间戳,单位为微秒 + + # 如果是处理的第一个有效数据包,记录开始时间和时间戳 + if self._first_packet_timestamp == Timestamp.UNSET: + self._start_real_time = time.time() # 记录实际开始处理的时间 (秒) + self._first_packet_timestamp = current_pkt_timestamp + Log.log_node(LogLevel.INFO, self.node_, f"First packet timestamp: {self._first_packet_timestamp} us, Start real time: {self._start_real_time:.4f} s") + + # 计算数据包相对于第一个包的时间差(微秒) + pkt_time_diff_us = current_pkt_timestamp - self._first_packet_timestamp + + # 计算数据包相对于开始时间点理论上应该发送的实际时间(秒) + # 将微秒转换为秒:除以 1,000,000 + expected_real_time_sec = self._start_real_time + pkt_time_diff_us / 1000000.0 + + # 获取当前的实际系统时间(秒) + current_real_time_sec = time.time() + + # 计算需要等待的时间(秒) + # 如果理论发送时间在未来,则需要等待 + wait_time_sec = expected_real_time_sec - current_real_time_sec + + if wait_time_sec > 0: + # 进行等待,模拟实时播放的延迟 + # Log.log_node(LogLevel.DEBUG, self.node_, f"Waiting for {wait_time_sec:.4f} seconds for packet with timestamp {current_pkt_timestamp} us") + time.sleep(wait_time_sec) + + # 将数据包放入输出队列,发送给下游模块 + output_packets.put(pkt) + + # 对于无效的数据包(未定义或时间戳为 UNSET),直接忽略或根据需求处理 + # 在这个简单实现中,我们只处理有效数据包和 EOF + + # 如果循环结束,输入队列已空,但没有收到 EOS,说明当前没有更多数据可处理, + # 或者正在等待合适的发送时间。返回 AGAIN 通知 BMF 引擎稍后再次调用 process 方法。 + return ProcessResult.OK diff --git a/bmf/test/realtime_delay_module/readme.md b/bmf/test/realtime_delay_module/readme.md new file mode 100644 index 00000000..a6e95393 --- /dev/null +++ b/bmf/test/realtime_delay_module/readme.md @@ -0,0 +1,120 @@ +# 实时延迟滤镜模块 (Python) + +## 简介 + +本文档提供了关于 Babit 多媒体框架 (BMF) 中 `realtime_delay_filter` Python 模块的概述。此模块旨在模拟 FFmpeg 的 `-re`(以原始帧率读取输入)选项的行为,允许 BMF 图根据数据包的时间戳以实时速度处理多媒体流。 + +## 功能 + +`realtime_delay_filter` 模块作为一个直通(passthrough)滤镜,用于控制多媒体数据包的流速。它读取输入的包,并根据它们的显示时间戳(PTS)和处理开始以来的实际经过时间,引入延迟,以确保数据包的释放速度与时间戳所指示的实际播放速度同步。 + +这在以下场景中特别有用: + +* 模拟实时流,其中数据必须以特定速率处理。 +* 使用受实时限制的输入测试下游模块或系统。 +* 分析多媒体流时关注时序和同步。 + +## 实现细节 + +* **语言:** Python +* **基类:** 继承自 `bmf.Module`。 +* **核心逻辑:** 模块使用 `time.time()` 函数跟踪实际时间,并将其与输入数据包的微秒单位时间戳(`Packet.timestamp`)进行比较。如果一个包在其理论上的实时显示时间点(根据其时间戳相对于第一个包的时间戳和处理开始时间计算得出)之前到达,模块将使用 `time.sleep()` 暂停执行所需的时长。 +* **时间戳单位:** 假设输入包的时间戳单位是微秒(这与许多 BMF 内置模块如 `c_ffmpeg_decoder` 的标准一致)。 +* **数据包处理:** 模块从其输入队列读取包,应用实时延迟逻辑后,将原始包(不修改其内容)放入相应的输出队列。 +* **流结束 (EOF):** 正确处理 `bmf.Timestamp.EOF` 包,以向所有下游模块和 BMF 图通知流已结束。 + +## 使用方法 + +### 在 BMF 图中使用 + +在构建 BMF 图时,可以通过指定模块名称 `"realtime_delay_filter"` 来创建模块节点。 + +#### Python API 示例 + +以下是一个使用 `realtime_delay_filter` 模块的简单 BMF 图示例,用于以实时方式处理视频文件: + +```python +import bmf +import sys +import os +import time + +# 定义输入和输出文件路径 +input_file = "../../files/test.mp4" # 请根据你的测试文件实际位置调整路径 +output_file = "./realtime_output_with_filter.mp4" + +# --- (可选) 输入文件检查 --- +# if not os.path.exists(input_file): +# print(f"错误:找不到输入文件: {input_file}") +# sys.exit(1) +# print(f"开始执行 BMF 图,输入文件: {input_file}") +# print(f"输出文件将保存到: {output_file}") +# --- 结束可选 --- + +try: + # 创建一个 BMF 图对象 + graph = bmf.graph() + + # 解码输入视频文件 + decoded_streams = graph.decode({'input_path': input_file}) + + # 选择视频流 + video_stream = decoded_streams['video'] + + # 通过 realtime_delay_filter 模块处理视频流 + # 模块名称必须与放置在 bmf/modules/ 目录下的文件名一致(不带 .py 后缀) + processed_video_stream = video_stream.module("realtime_delay_filter", option={}) + + # 编码处理后的视频流到输出文件 + bmf.encode( + processed_video_stream, # 处理后的视频流 + None, # 在此简单示例中不处理音频流 + { + "output_path": output_file, + "video_params": { + "codec": "h264", + "preset": "veryfast", # 编码速度预设 + "crf": 23 # 质量控制参数 + }, + "format": "mp4", # 输出文件格式 + "loglevel": "info" # 设置日志级别 + } + ) + + # 运行图并测量执行时间 + print("正在运行带有 realtime_delay_filter 的 BMF 图...") + start_time = time.time() + graph.run() + end_time = time.time() + execution_time = end_time - start_time + + print("BMF 图执行完成。") + print(f"输出文件已保存到: {output_file}") + print(f"总执行时间: {execution_time:.4f} 秒") + + +except Exception as e: + print(f"\n--- BMF 图执行错误 ---") + print(f"BMF 图执行过程中发生错误: {e}") + print("请检查详细错误信息和 BMF 日志。") + print("-----------------------------------") +``` + +*注意:请调整示例中 `input_file` 的路径,使其正确指向你的测试视频文件(例如,相对于测试脚本位于 `../../files/test.mp4`)。* + +## 选项 + +目前,`realtime_delay_filter` 模块不接受通过 BMF 图中的 `option` 参数进行配置的任何选项。它完全基于输入数据包的时间戳和系统时钟进行操作。 + +*(如果你将来添加选项,例如速度乘数,请在此处进行文档说明。)* + +## 输入/输出流 + +* **输入:** 接受一个或多个输入流。它设计用于处理包含有效时间戳的数据包流(例如,来自解码器的视频或音频流)。 +* **输出:** 为每个输入流产生相应的输出流,在应用实时延迟后直通原始数据包。输出流的数量和类型与输入流匹配。 + +## 注意事项与限制 + +* **时序精度:** 实时模拟依赖于 Python 的 `time.sleep()`,这可能无法提供高精度的延迟,特别是对于非常短的时间间隔。实际处理时间可能会由于系统调度和开销而略微超过理论时长。 +* **时间戳单位:** 模块假定输入时间戳单位是微秒。请确保上游模块生成的时间戳单位正确,以实现准确的实时模拟。 +* **多流同步:** 尽管模块处理来自多个输入流的数据包,但延迟是根据每个数据包的时间戳相对于开始时间独立应用的。此基本实现并未明确处理复杂的多流间同步细节(超出基于时间戳的简单 pacing)。 diff --git a/bmf/test/realtime_delay_module/test_realtime.py b/bmf/test/realtime_delay_module/test_realtime.py new file mode 100644 index 00000000..761279bf --- /dev/null +++ b/bmf/test/realtime_delay_module/test_realtime.py @@ -0,0 +1,69 @@ +import bmf +import sys +import os +import time # 导入 time 模块用于计时 + +# 定义输入和输出文件路径 +input_file = "./test.mp4" # !!! 请确保在脚本同目录下有视频文件 +output_file = "./realtime_output_simple_working.mp4" # 输出文件名 + +# 检查输入文件是否存在,如果不存在则退出并提示 +if not os.path.exists(input_file): + print(f"错误:找不到输入文件: {input_file}") + print("请确保在脚本同目录下有一个名为 test.mp4 的视频文件。") + sys.exit(1) + +print(f"开始执行 BMF 图,输入文件: {input_file}") +print(f"输出文件将保存到: {output_file}") + +try: + # 创建一个 BMF 图对象 + graph = bmf.graph() + + # === 图节点和连接 === + + # 1. 使用内置解码器解码输入文件 + # graph.decode() 返回一个字典,包含 'video' 和 'audio' 流(如果存在) + decoded_streams = graph.decode({'input_path': input_file}) + + # 获取解码器输出的视频流对象 + video_stream = decoded_streams['video'] + + # 2. 将视频流输入到实时读取 Python 模块 + processed_video_stream = video_stream.module("realtime_delay_filter", option={}) + + # 3. 将模块处理后的视频流输入给编码器并指定输出文件 + bmf.encode( + processed_video_stream, # 模块输出的视频流 + None, # 没有音频流输入 + { # 编码器选项 + "output_path": output_file, # 指定输出文件路径 + "video_params": { # 视频编码参数 + "codec": "h264", # 使用 h264 编码 + "preset": "veryfast", # 编码速度预设(影响文件大小和编码速度) + "crf": 23 # 质量控制参数 + }, + "format": "mp4", # 输出文件格式为 mp4 + "loglevel": "info" # 输出 BMF 日志信息 + } + ) + + # === 运行图并计时 === + print("运行 BMF 图...") + start_time = time.time() # 记录开始时间 + graph.run() # 启动图的执行并等待直到完成 + end_time = time.time() # 记录结束时间 + execution_time = end_time - start_time # 计算执行时间 + + # 如果运行到这里没有抛出异常,说明图执行成功 + print("BMF 图执行成功!") + print(f"输出文件已保存到: {output_file}") + print(f"BMF 图执行总耗时: {execution_time:.4f} 秒") # 打印执行时间,验证功能是否生效 + + +# 捕获执行过程中可能发生的任何异常并打印错误信息 +except Exception as e: + print(f"\n--- BMF 图执行错误 ---") + print(f"BMF 图执行过程中发生错误: {e}") + print("请检查上面输出的详细错误信息和 BMF 的日志。") + print("-----------------------------------") \ No newline at end of file