该程序是基于OpenHarmony的C++公共基础类库的读写锁:SafeBlockQueue。
线程安全阻塞队列SafeBlockQueue类,提供阻塞和非阻塞版的入队入队和出队接口,并提供可最追踪任务完成状态的的SafeBlockQueueTracking类。
本案例主要完成如下工作:
(1)使用SafeBlockQueue接口的案例
(2)使用SafeBlockQueueTracking接口的案例
该案例已在凌蒙派-RK3568开发板验证过,如需要完整源代码,请参考:
C++公共基础类库为标准系统提供了一些常用的C++开发工具类,包括:
修改需调用模块的BUILD.gn,在external_deps或deps中添加如下:
ohos_shared_library("xxxxx") {
...
external_deps = [
...
# 动态库依赖(可选)
"c_utils:utils",
# 静态库依赖(可选)
"c_utils:utilsbase",
# Rust动态库依赖(可选)
"c_utils:utils_rust",
]
...
}
一般而言,我们只需要填写"c_utils:utils"即可。
C++公共基础类库的SafeBlockQueue头文件在://commonlibrary/c_utils/base/include/safe_block_queue.h
可在源代码中添加如下:
#include <safe_block_queue.h>
构造函数。
SafeBlockQueue(int capacity)
参数说明:
参数名称 | 类型 | 参数说明 |
---|---|---|
capacity | int | SafeBlockQueue的容量,即能存储多少个单元 |
析构函数。
~SafeBlockQueue();
入队操作(阻塞版)。
void virtual Push(T const& elem);
入队操作(非阻塞版)。
bool virtual PushNoWait(T const& elem);
返回值说明:
类型 | 返回值说明 |
---|---|
bool | true表示成功,false表示失败 |
出队操作(阻塞版)。
T Pop();
返回值说明:
类型 | 返回值说明 |
---|---|
T | 出队的单元 |
出队操作(非阻塞版)。
bool PopNotWait(T& outtask);
参数说明:
参数名称 | 类型 | 参数说明 |
---|---|---|
outtask | T | 出队的单元 |
返回值说明:
类型 | 返回值说明 |
---|---|
bool | true表示成功,false表示失败 |
获取队列容量。
unsigned int Size();
返回值说明:
类型 | 返回值说明 |
---|---|
unsigned int | 返回队列的容量 |
队列判空。
bool IsEmpty;
返回值说明:
类型 | 返回值说明 |
---|---|
bool | true表示成功,false表示失败 |
判断map是否为满。
bool IsFull();
返回值说明:
类型 | 返回值说明 |
---|---|
bool | true表示空,false表示非空 |
构造函数。
SafeBlockQueue(int capacity)
参数说明:
参数名称 | 类型 | 参数说明 |
---|---|---|
capacity | int | SafeBlockQueue的容量,即能存储多少个单元 |
析构函数。
~SafeBlockQueue();
入队操作(阻塞版)。
void virtual Push(T const& elem);
入队操作(非阻塞版)。
bool virtual PushNoWait(T const& elem);
返回值说明:
类型 | 返回值说明 |
---|---|
bool | true表示成功,false表示失败 |
在上一级目录BUILD.gn文件添加一行编译引导语句。
import("//build/ohos.gni")
group("samples") {
deps = [
"a29_utils_safeblockqueue:utils_safeblockqueue", # 添加该行
]
}
"a29_utils_safeblockqueue:utils_safeblockqueue",
该行语句表示引入 参与编译。
创建a29_utils_safeblockqueue目录,并添加如下文件:
a29_utils_safeblockqueue
├── utils_safeblockqueue_sample.cpp # .cpp源代码
├── utils_safeblockqueuetracking_sample.cpp # .cpp源代码
├── BUILD.gn # GN文件
编辑BUILD.gn文件。
import("//build/ohos.gni")
ohos_executable("utils_safeblockqueue_test") {
sources = [ "utils_safeblockqueue_sample.cpp" ]
include_dirs = [
"//commonlibrary/c_utils/base/include",
"//commonlibrary/c_utils/base:utils",
"//third_party/googletest:gtest_main",
"//third_party/googletest/googletest/include"
]
external_deps = [
"c_utils:utils"
]
part_name = "product_rk3568"
install_enable = true
}
ohos_executable("utils_safeblockqueue_tracking") {
sources = [ "utils_safeblockqueuetracking_sample.cpp" ]
include_dirs = [
"//commonlibrary/c_utils/base/include",
"//commonlibrary/c_utils/base:utils",
"//third_party/googletest:gtest_main",
"//third_party/googletest/googletest/include"
]
external_deps = [
"c_utils:utils"
]
part_name = "product_rk3568"
install_enable = true
}
group("utils_safeblockqueue") {
deps = [
":utils_safeblockqueue_test", # 构建SafeBlockQueue案例
":utils_safeblockqueue_tracking", # 构建SafeBlockQueueTracking案例
]
}
注意:
(1)BUILD.gn中所有的TAB键必须转化为空格,否则会报错。如果自己不知道如何规范化,可以:
# 安装gn工具
sudo apt-get install ninja-build
sudo apt install generate-ninja
# 规范化BUILD.gn
gn format BUILD.gn
#include <safe_block_queue.h> // SafeBlockQueue的头文件
// 定义常量
const int SIZE = 5;
// 定义SafeBlockQueue变量
OHOS::SafeBlockQueue<int> m_safeBlockQueue(SIZE);
命令有1个参数,分别是:
int main(int argc, char **argv)
{
bool enable_wait = true;
......
// 获取命令行参数
if (argc != 2) {
cout << "Usage: " << argv[0] << " " << STRING_WAIT << "/" << STRING_NOWAIT << endl;
return -1;
}
if (strncmp(argv[1], STRING_WAIT, sizeof(STRING_WAIT)) == 0) {
enable_wait = true;
} else if (strncmp(argv[1], STRING_NOWAIT, sizeof(STRING_NOWAIT)) == 0) {
enable_wait = false;
} else {
cout << "Usage: " << argv[0] << " " << STRING_WAIT << "/" << STRING_NOWAIT << endl;
return -1;
}
}
int main(int argc, char **argv)
{
OHOS::ThreadPool threads("threads");
string str_name;
......
threads.SetMaxTaskNum(4);
threads.Start(2);
......
}
调用AddTask()添加子线程,并调用Stop()等待所有子进程结束。
// 创建生产者线程
cout << get_curtime() << ", " << __func__ << ": task_product start" << endl;
auto task_product = (enable_wait) ? (std::bind(product_wait, str_name)) : (std::bind(product_nowait, str_name));
threads.AddTask(task_product);
// 等待SIZE秒,将SafeBlockQueue容器填满
cout << get_curtime() << ", " << __func__ << ": sleep " << SIZE << " sec" << endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000 * SIZE));
// 创建消费者线程
cout << get_curtime() << ", " << __func__ << ": consume start" << endl;
auto task_consumer = (enable_wait) ? (std::bind(consume_wait, str_name)) : (std::bind(consume_nowait, str_name));
threads.AddTask(task_consumer);
threads.Stop();
cout << get_curtime() << ", " << __func__ << ": Queue Wait End" << endl;
static void product_wait(const string &name)
{
for (int i = 0; i < (2 * SIZE); i++) {
cout << get_curtime() << ", " << __func__ << ": Push Start, i = " << i << endl;
// 使用阻塞方式的SafeBlockQueue
m_safeBlockQueue.Push(i);
cout << get_curtime() << ", " << __func__ << ": Push Success, i = " << i << endl;
// 等待1秒
cout << get_curtime() << ", " << __func__ << ": Sleep 1 sec " << endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
static void consume_wait(const string &name)
{
for (int i = 0; i < (2 * SIZE); i++) {
cout << get_curtime() << ", " << __func__ << ": Pop Start, i = " << i << endl;
// 使用阻塞方式的SafeBlockQueue
int value = m_safeBlockQueue.Pop();
cout << get_curtime() << ", " << __func__ << ": Pop Success, i = " << i << ", value = " << value << endl;
// 等待0.5秒
cout << get_curtime() << ", " << __func__ << ": Sleep 0.5 sec " << endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
static void product_nowait(const string &name)
{
bool ret;
for (int i = 0; i < (2 * SIZE); i++) {
cout << get_curtime() << ", " << __func__ << ": Push Start, i = " << i << endl;
// 使用非阻塞方式的SafeBlockQueue
ret = m_safeBlockQueue.PushNoWait(i);
cout << get_curtime() << ", " << __func__ << ": Push ret = " << ret << ", i = " << i << endl;
// 等待1秒
cout << get_curtime() << ", " << __func__ << ": Sleep 1 sec " << endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
static void consume_nowait(const string &name)
{
for (int i = 0; i < (SIZE * 2); i++) {
// 等待有新数据
int value = 0;
// 使用非阻塞方式的SafeBlockQueue
bool ret = m_safeBlockQueue.PopNotWait(value);
cout << get_curtime() << ", " << __func__ << ": PopNotWait ret = " << ret << ", value = " << value << endl;
// 等待500毫秒
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
#include <safe_block_queue.h> // SafeBlockQueue的头文件
// 定义常量
const int SIZE = 5;
// 定义SafeBlockQueue变量
OHOS::SafeBlockQueueTracking<int> m_safeBlockQueueTracking(SIZE);
命令有1个参数,分别是:
int main(int argc, char **argv)
{
bool enable_wait = true;
......
// 获取命令行参数
if (argc != 2) {
cout << "Usage: " << argv[0] << " " << STRING_WAIT << "/" << STRING_NOWAIT << endl;
return -1;
}
if (strncmp(argv[1], STRING_WAIT, sizeof(STRING_WAIT)) == 0) {
enable_wait = true;
} else if (strncmp(argv[1], STRING_NOWAIT, sizeof(STRING_NOWAIT)) == 0) {
enable_wait = false;
} else {
cout << "Usage: " << argv[0] << " " << STRING_WAIT << "/" << STRING_NOWAIT << endl;
return -1;
}
}
int main(int argc, char **argv)
{
OHOS::ThreadPool threads("threads");
string str_name;
......
threads.SetMaxTaskNum(4);
threads.Start(2);
......
}
调用AddTask()添加子线程,并调用Stop()等待所有子进程结束。
// 创建生产者线程
cout << get_curtime() << ", " << __func__ << ": task_product start" << endl;
auto task_product = (enable_wait) ? (std::bind(product_wait, str_name)) : (std::bind(product_nowait, str_name));
threads.AddTask(task_product);
// 等待SIZE秒,将SafeBlockQueue容器填满
cout << get_curtime() << ", " << __func__ << ": sleep " << SIZE << " sec" << endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000 * SIZE));
// 创建消费者线程
cout << get_curtime() << ", " << __func__ << ": consume start" << endl;
auto task_consumer = (enable_wait) ? (std::bind(consume_wait, str_name)) : (std::bind(consume_nowait, str_name));
threads.AddTask(task_consumer);
threads.Stop();
cout << get_curtime() << ", " << __func__ << ": Queue Wait End" << endl;
static void product_wait(const string &name)
{
for (int i = 0; i < (2 * SIZE); i++) {
cout << get_curtime() << ", " << __func__ << ": Push Start, i = " << i << endl;
// 使用阻塞方式的SafeBlockQueueTracking
m_safeBlockQueueTracking.Push(i);
cout << get_curtime() << ", " << __func__ << ": Push Success, i = " << i << endl;
// 等待1秒
cout << get_curtime() << ", " << __func__ << ": Sleep 1 sec " << endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
static void consume_wait(const string &name)
{
for (int i = 0; i < (2 * SIZE); i++) {
cout << get_curtime() << ", " << __func__ << ": Pop Start, i = " << i << endl;
// 使用阻塞方式的SafeBlockQueueTracking
int value = m_safeBlockQueueTracking.Pop();
cout << get_curtime() << ", " << __func__ << ": Pop Success, i = " << i << ", value = " << value << endl;
m_safeBlockQueueTracking.OneTaskDone();
cout << get_curtime() << ", " << __func__ << ": Push OneTaskDone successful" << endl;
// 等待0.5秒
cout << get_curtime() << ", " << __func__ << ": Sleep 0.5 sec " << endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
static void product_nowait(const string &name)
{
bool ret;
for (int i = 0; i < (2 * SIZE); i++) {
cout << get_curtime() << ", " << __func__ << ": Push Start, i = " << i << endl;
// 使用非阻塞方式的SafeBlockQueueTracking
ret = m_safeBlockQueueTracking.PushNoWait(i);
cout << get_curtime() << ", " << __func__ << ": Push ret = " << ret << ", i = " << i << endl;
if (ret == true) {
m_safeBlockQueueTracking.OneTaskDone();
cout << get_curtime() << ", " << __func__ << ": Push OneTaskDone successful" << endl;
}
// 等待1秒
cout << get_curtime() << ", " << __func__ << ": Sleep 1 sec " << endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
static void consume_nowait(const string &name)
{
for (int i = 0; i < (SIZE * 2); i++) {
// 等待有新数据
int value = 0;
// 使用非阻塞方式的SafeBlockQueueTracking
bool ret = m_safeBlockQueueTracking.PopNotWait(value);
cout << get_curtime() << ", " << __func__ << ": PopNotWait ret = " << ret << ", value = " << value << endl;
// 等待500毫秒
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
进入OpenHarmony编译环境,运行命令:
hb build -f
#