播放器开发--有界堵塞队列
这是什么
顾名思义,这是一个同时具备”有界”和”堵塞”特性的队列,专为多线程生产者-消费者场景设计。
- 有界:构造函数传入
max_size,限制队列最大长度。队列满时生产者被阻塞,防止上游无限制地生产数据撑爆内存。
- 堵塞:用
std::mutex 保障线程安全,std::condition_variable 实现挂起/唤醒——队列满时生产者等 _not_full 信号,队列空时消费者等 _not_empty 信号。
在这个播放器里,队列是流水线各阶段之间的”弹性缓冲”:
1 2
| Demuxer(生产) → PacketQueue(缓冲) → Decoder(消费) Decoder(生产) → FrameQueue(缓冲) → 渲染/音频输出(消费)
|
- 解封装快、解码慢时,包堆积在 PacketQueue,满了会反压解封装线程
- 解码快、渲染慢时,帧堆积在 FrameQueue,满了会反压解码线程
这样就自然限定了内存占用,不需要手动轮询或估算。
队列的结构和实现
项目里多处要用到队列(视频包队列、音频包队列、视频帧队列、音频帧队列),且元素类型不同(AVPacketPtr 和 FramePtr)。所以实现一个泛型基类 BaseQueue<T>,具体类型通过 typedef/继承获得:
PacketQueue = BaseQueue<AVPacketPtr>,最大容量 100
FrameQueue = BaseQueue<FramePtr>,最大容量 5,额外增加尾部保护逻辑
BaseQueue 的结构
内部成员变量
| 变量 |
作用 |
std::mutex _mutex |
互斥锁,保护 _queue 的所有读写操作 |
std::size_t _max_size |
队列容量上限,构造时指定 |
std::condition_variable _not_full |
“非满”条件变量,生产者在队列满时在此等待 |
std::condition_variable _not_empty |
“非空”条件变量,消费者在队列空时在此等待 |
std::atomic<bool> _abort |
终止标记,设为 true 后唤醒所有等待线程,用于 stop/seek 时快速退出阻塞 |
std::queue<T> _queue |
实际存储元素的标准队列 |
内部主要成员函数
队列对外提供两类接口:堵塞版和非堵塞版,分别对应不同的使用场景。
入队操作
bool push(U&& item) — 堵塞入队。如果队列已满,线程挂起等待 _not_full 信号;如果 _abort 被置位,立即返回 false。入队成功后通知 _not_empty,唤醒可能正在等待的消费者。
bool try_push(U&& item) — 非堵塞入队。队列满时直接返回 false,不等待。适合生产者想在满时做其他事情的场景。
出队操作
T pop() — 堵塞出队。队列空时挂起等待 _not_empty;_abort 置位且队列空时返回空值。取出后通知 _not_full,唤醒可能正在等待的生产者。
T try_pop() — 非堵塞出队。队列空时立即返回空值。
条件出队
bool try_pop_if(predicate, T& item) — 只在队首元素满足条件时才弹出。用于帧丢弃等需要”看一眼再决定要不要取走”的场景。
bool try_inspect_front(inspector) — 查看队首但不弹出,传入的 inspector 回调负责读取需要的字段。同样用于”先看再决定”。
状态与控制
bool empty() / std::size_t size() — 查询队列空/满状态
bool clear() — 清空队列并通知所有等待的生产者
void abort() — 置位 _abort,唤醒所有等待线程。是线程安全退出的核心机制
bool aborted() — 查询终止标记
void reset() — 重置终止标记并清空队列,用于 seek 后重新开始
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
| template<typename T> class BaseQueue { public: explicit BaseQueue(size_t max_size) : _max_size(max_size) {} virtual ~BaseQueue() = default;
BaseQueue(const BaseQueue&) = delete; BaseQueue& operator=(const BaseQueue&) = delete;
template<typename U> bool try_push(U&& item) { { std::unique_lock<std::mutex> lock(_mutex); if (_queue.size() >= _max_size) { return false; } _queue.push(std::forward<U>(item)); } _not_empty.notify_one(); return true; }
template<typename U> bool push(U&& item) { { std::unique_lock<std::mutex> lock(_mutex); _not_full.wait(lock, [this]() { return _queue.size() < _max_size || _abort.load(); }); if (_abort.load()) { return false; } _queue.push(std::forward<U>(item)); } _not_empty.notify_one(); return true; }
T try_pop() { T item; { std::unique_lock<std::mutex> lock(_mutex); if (_queue.empty()) { return T(); } item = std::move(_queue.front()); _queue.pop(); } _not_full.notify_one(); return item; }
T pop() { T item; { std::unique_lock<std::mutex> lock(_mutex); _not_empty.wait(lock, [this]() { return !_queue.empty() || _abort.load(); }); if (_abort.load() && _queue.empty()) { return T(); } item = std::move(_queue.front()); _queue.pop(); } _not_full.notify_one(); return item; }
bool try_inspect_front(const std::function<void(const T&)>& inspector) { std::unique_lock<std::mutex> lock(_mutex); if (_queue.empty()) return false; inspector(_queue.front()); return true; }
bool try_pop_if(const std::function<bool(const T&)>& predicate, T& item) { { std::unique_lock<std::mutex> lock(_mutex); if (_queue.empty() || !predicate(_queue.front())) { return false; } item = std::move(_queue.front()); _queue.pop(); } _not_full.notify_one(); return true; }
bool empty() { std::lock_guard<std::mutex> lock(_mutex); return _queue.empty(); }
std::size_t size() { std::lock_guard<std::mutex> lock(_mutex); return _queue.size(); }
bool clear() { { std::lock_guard<std::mutex> lock(_mutex); while (!_queue.empty()) _queue.pop(); } _not_full.notify_all(); return true; }
void abort() { _abort.store(true); _not_full.notify_all(); _not_empty.notify_all(); }
bool aborted() const { return _abort.load(); }
void reset() { { std::lock_guard<std::mutex> lock(_mutex); _abort.store(false); while (!_queue.empty()) _queue.pop(); } _not_full.notify_all(); _not_empty.notify_all(); }
protected: std::mutex _mutex; std::size_t _max_size; std::condition_variable _not_full; std::condition_variable _not_empty; std::atomic<bool> _abort{false}; std::queue<T> _queue; };
|
关键设计细节
为什么 push/pop 用 wait 而不是 wait_for + 重试?
wait(lock, predicate) 会在条件不满足时原子地释放锁并挂起,避免”持有锁却没事可做”的浪费。当 notify 到来时,线程被唤醒并重新持有锁、检查条件。这种设计比定时轮询更高效,比忙等更省 CPU。
_abort 的作用:优雅退出
如果线程卡在 push 或 pop 上等待,外部调用 stop() 时不能直接 join——那会死锁。abort() 置位 _abort 后通知所有等待线程,被唤醒的线程检查到 _abort 为 true 后立即返回,外部再 join 就能正常退出。
notify_one vs notify_all
入队/出队后只用 notify_one,因为只需要唤醒一个等待方即可,唤醒多余的线程反而会增加锁竞争。但 clear、abort、reset 用 notify_all,因为这些操作影响所有等待者——任何等待者都不能继续按原有逻辑运行。
unique_ptr 的扩展:自定义析构函数(Custom Deleter)
std::unique_ptr 默认用 delete 销毁指针,但 FFmpeg 的对象有自己专用的释放函数——比如 av_frame_free() 和 av_packet_free()。如果直接用默认 delete,会导致未定义行为或内存泄漏。所以我们需要给 unique_ptr 装上”自定义析构函数”,在生命周期结束时分发到正确的 C 函数。
为什么需要自定义析构函数?
| 场景 |
默认 delete 的问题 |
| 资源来自 C 风格 API |
FILE* 需要 fclose(),AVFrame* 需要 av_frame_free(),直接 delete 行为未定义 |
| 资源需要归还对象池 |
对象不需要销毁,而是放回池中复用 |
| 特定的销毁函数 |
Win32 的 CloseHandle、OpenSSL 的 X509_free 等,每种资源有各自的释放方式 |
核心问题:FFmpeg 在堆内部分配了附加数据(如帧数据缓冲区、包 side data),这些数据只有 FFmpeg 自己的释放函数知道如何正确清理。直接用 delete 只会释放 AVFrame 结构体本身,泄漏所有内部缓冲区。
三种定义方式
方式 A:函数指针
最直接的方式,适合简单的 C 风格清理函数。缺点是指针本身占用 unique_ptr 的空间(8 字节)。
1 2 3 4 5 6 7 8 9
| #include <memory> #include <cstdio>
void close_file(FILE* fp) { if (fp) fclose(fp); }
std::unique_ptr<FILE, void(*)(FILE*)> ptr(fopen("test.txt", "w"), close_file);
|
方式 B:Lambda 表达式
Lambda 更简洁,不需要单独定义外部函数。但需要注意:
- 无捕获的 Lambda 才不占用额外空间
- 模板参数要写
decltype(lambda),不够直观
1 2
| auto deleter = [](int* p) { delete p; }; std::unique_ptr<int, decltype(deleter)> ptr(new int(42), deleter);
|
方式 C:仿函数(Functor)——项目采用的方式
定义一个只包含 operator() 的 struct。推荐原因:
- 零开销:空类(无成员变量)享受空基类优化,
unique_ptr 体积不增加
- 类型可命名:可以直接写在模板参数里,比
decltype 更干净
- 可复用:同一个 Deleter 类型适用于多处
unique_ptr 声明
1 2 3 4 5
| struct MyDeleter { void operator()(int* p) const { delete p; } };
std::unique_ptr<int, MyDeleter> ptr(new int(10));
|
项目中的实际使用:FFmpegPtrs.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| struct AVFrameDeleter { AVFrameDeleter() = default; void operator()(AVFrame* frame) const { if (frame) { av_frame_free(&frame); } } }; using FramePtr = std::unique_ptr<AVFrame, AVFrameDeleter>;
struct AVPacketDeleter { AVPacketDeleter() = default; void operator()(AVPacket* packet) const { if (packet) { av_packet_free(&packet); } } }; using AVPacketPtr = std::unique_ptr<AVPacket, AVPacketDeleter>;
|
定义了这两个别名之后,整个项目里就用 FramePtr / AVPacketPtr 代替裸指针,配合 BaseQueue<FramePtr> 这样的泛型队列,资源管理完全自动化——队列 clear() 或对象离开作用域时,FFmpeg 的资源自动通过自定义 deleter 正确释放。
FrameQueue 的扩展:尾部保护
FrameQueue 继承 BaseQueue<FramePtr>,新增了与帧丢弃相关的逻辑:
markDraining() / draining() — 标记流即将结束(进入 draining 状态)
protectTailFrom(ptsSeconds) / protectsTailFrame(ptsSeconds) — 当帧的 PTS 进入尾部保护区间时,即使落后时钟也不丢弃,保证最后几帧能正常显示,不会因为同步机制而跳帧到结尾
这一块会在后续的 A/V 同步文章中展开。
这次的插图来自画师JAChrysant
图片地址:https://www.pixiv.net/artworks/94827189
本项目源码请看:https://github.com/DongGuZhengHuaJi/VideoPlayer