-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathstream_worker.cpp
More file actions
107 lines (90 loc) · 3.35 KB
/
stream_worker.cpp
File metadata and controls
107 lines (90 loc) · 3.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
//
// Created by pengx on 2026/2/10.
//
#include "stream_worker.hpp"
#include "ps_muxer.hpp"
#include <QDebug>
#include <QThread>
StreamWorker::StreamWorker(const size_t bufferSize, QObject* parent) : QObject{parent} {
_ringBuffer.capacity = bufferSize;
_ringBuffer.frames.reserve(bufferSize);
}
StreamWorker::~StreamWorker() = default;
void StreamWorker::frameReceived(const std::vector<uint8_t>& h264Data, const uint32_t pts) {
QMutexLocker lock(&_mutex);
bool is_sps_pps = false;
if (h264Data.size() > 4) {
const uint8_t* data = h264Data.data();
// 查找NALU类型
for (size_t i = 0; i < h264Data.size() - 4; i++) {
if (data[i] == 0x00 && data[i + 1] == 0x00 &&
data[i + 2] == 0x00 && data[i + 3] == 0x01) {
const uint8_t nal_type = data[i + 4] & 0x1F;
if (nal_type == 7 || nal_type == 8) {
// SPS或PPS
is_sps_pps = true;
break;
}
}
}
}
// 如果缓冲区已满且不是SPS/PPS,才丢弃最旧帧
if (_ringBuffer.count >= _ringBuffer.capacity && !is_sps_pps) {
qWarning() << "StreamWorker ring buffer full, dropping oldest frame. Size:" << h264Data.size() << "bytes";
_ringBuffer.readIndex = (_ringBuffer.readIndex + 1) % _ringBuffer.capacity;
_ringBuffer.count--;
}
// 写入新帧
VideoFrame frame;
frame.data = h264Data; // 拷贝数据
frame.pts = pts;
if (_ringBuffer.frames.size() < _ringBuffer.capacity) {
_ringBuffer.frames.push_back(frame);
} else {
_ringBuffer.frames[_ringBuffer.writeIndex] = frame;
}
_ringBuffer.writeIndex = (_ringBuffer.writeIndex + 1) % _ringBuffer.capacity;
if (_ringBuffer.count < _ringBuffer.capacity) {
++_ringBuffer.count;
}
// 触发处理(如果未在处理中)
if (!_isProcessing && _ringBuffer.count > 0) {
_isProcessing = true;
lock.unlock(); // 先解锁再emit,避免死锁
handleFrame();
}
}
void StreamWorker::handleFrame() {
VideoFrame frame;
{
QMutexLocker lock(&_mutex);
if (_ringBuffer.count == 0) {
_isProcessing = false;
return;
}
frame = _ringBuffer.frames[_ringBuffer.readIndex];
_ringBuffer.readIndex = (_ringBuffer.readIndex + 1) % _ringBuffer.capacity;
--_ringBuffer.count;
}
if (frame.data.size() < 4) {
qWarning() << "丢弃过小的帧,大小: " << frame.data.size();
QTimer::singleShot(0, this, &StreamWorker::handleFrame);
return;
}
// 在 worker 线程中处理(耗时操作)
PsMuxer::get()->writeVideoFrame(frame.data.data(), frame.pts, frame.data.size());
// ==================== 控制发送速率 ====================
// 根据帧大小动态调整延迟,让发送缓冲区有时间消化
if (frame.data.size() > 50000) {
// 大帧(IDR)
QThread::msleep(7); // 7ms延迟
} else if (frame.data.size() > 20000) {
// 中帧
QThread::msleep(3); // 3ms延迟
} else {
// 小帧(P帧)
QThread::msleep(1); // 1ms延迟
}
// 继续处理下一帧。涉及到I/O,需要异步调用,不能像预览那样直接递归
QTimer::singleShot(0, this, &StreamWorker::handleFrame);
}