完善资料让更多小伙伴认识你,还能领取20积分哦, 立即完善>
|
|
|
相关推荐
2个回答
|
|
|
MediaPipe 感觉中文直译为“媒体管道”,为啥会有这个名字呢?因为它把数据+处理组合成一个计算节点,然后把一个一个节点连接起来,用数据来驱动整个核心逻辑。如果大家对Caffe、ncnn类似的框架源码有一点点了解的话,就会觉得他们非常像,但是又不像。像是说,都是通过配置文件定义计算节点逻辑图,然后通过运算,得到我们想得到的逻辑图中节点的数据。不像的话,就是说的是Mediapipe的调度机制了,极大的增加了节点计算的并行功能,而那些框架是按照图节点的上下顺序进行执行的。
上面这段话可能有点抽象,我想表达的就是 Mediapipe 就是把任何一个“操作”都可以变为一个Calculator,因为我们的每一个项目的逻辑抽象出来都是 Calculator0+Data0->Calculator1+Data1->Calculator2+Data2->… …,然后,Mediapipe 做的是基于这种calculator的调度和执行。这里我举个栗子:
Packet Packet,是mediapipe中的数据单元,它可以接收任意类型的数据。也是mediapipe中的数据流动单元。就是在mediapipe中,我们设计的Graph中,所有的逻辑流动都是通过packet流动来实现的。 实例代码片段: //MakePacket MP_RETURN_IF_ERROR(graph.AddPacketToInputStream("in", MakePacket //从packet中获取数据 mediapipe::Packet packet; packet.Get Graph Graph是由各个Calculator组成的,可以直接把Calculator理解为数据结构中图的节点。而Graph直接把他当做图就行了。Graph是我们定义的逻辑流程的具体载体,也就是说我们的业务逻辑是什么样子的,那么Graph里面就会有相应的逻辑流程。可以具备多输入输出。 实例代码片段: CalculatorGraph graph; Caculator(Node) 上面不是介绍了Graph和Packet嘛,这里的Calculator就是Graph里面的节点,也是处理Packet的具体单元。可以具备多输入输出。 实例代码片段: //比如mediapipe/calculators/core/pass_through_calculator.h里面的定义,这个Calculator被Helloworld这个例子使用,作用就是把输入的数据直接传递到输出,不做任何处理,类似NOP class PassThroughCalculator : public CalculatorBase Stream Stream 就是 Caculator 之间的连接起来后,形成的一个数据流动路径。 Side packets Side packets 可以直接理解为一些静态的数据packet,在graph创建之后就不会改变的数据。 … … 自定义实现Calculator /* * @Description: * @Author: Sky * @Date: * @LastEditors: Sky * @LastEditTime: * @Github: */ #include #include "mediapipe/framework/calculator_graph.h" #include "mediapipe/framework/port/logging.h" #include "mediapipe/framework/port/parse_text_proto.h" #include "mediapipe/framework/port/status.h" //customer calculator #include "mediapipe/framework/calculator_framework.h" #include "mediapipe/framework/port/canonical_errors.h" class CustomerDataType{ public: CustomerDataType(int i, float f, bool b, const std::string & str): val_i(i), val_f(f), val_b(b), s_str(str) {} int val_i = 1; float val_f = 11.f; bool val_b = true; std::string s_str = "customer str."; }; namespace mediapipe { class MyStringProcessCalculator : public CalculatorBase { public: /* Calculator authors can specify the expected types of inputs and outputs of a calculator in GetContract(). When a graph is initialized, the framework calls a static method to verify if the packet types of the connected inputs and outputs match the information in this specification. */ static absl::Status GetContract(CalculatorContract* cc) { /* class InputStreamShard; typedef internal::Collection class OutputStreamShard; typedef internal::Collection */ //cc->Inputs().NumEntries() returns the number of input streams // if (!cc->Inputs().TagMap()->SameAs(*cc->Outputs().TagMap())) { // return absl::InvalidArgumentError("Input and output streams's TagMap can't be same."); // } //set stream // for (CollectionItemId id = cc->Inputs().BeginId(); id < cc->Inputs().EndId(); ++id) { // cc->Inputs().Get(id).SetAny(); // cc->Outputs().Get(id).SetSameAs(&cc->Inputs().Get(id)); // } cc->Inputs().Index(0).SetAny(); cc->Inputs().Index(1).Set cc->Outputs().Index(0).SetSameAs(&cc->Inputs().Index(0)); //set stream package // for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) { // cc->InputSidePackets().Get(id).SetAny(); // } // cc->InputSidePackets().Index(0).SetAny(); // cc->InputSidePackets().Index(1).Set if (cc->OutputSidePackets().NumEntries() != 0) { // if (!cc->InputSidePackets().TagMap()->SameAs(*cc->OutputSidePackets().TagMap())) { // return absl::InvalidArgumentError("Input and output side packets's TagMap can't be same."); // } // for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) { // cc->OutputSidePackets().Get(id).SetSameAs(&cc->InputSidePackets().Get(id)); // } cc->OutputSidePackets().Index(0).SetSameAs(&cc->InputSidePackets().Index(0)); } return absl::OkStatus(); } absl::Status Open(CalculatorContext* cc) final { for (CollectionItemId id = cc->Inputs().BeginId();id < cc->Inputs().EndId(); ++id) { if (!cc->Inputs().Get(id).Header().IsEmpty()) { cc->Outputs().Get(id).SetHeader(cc->Inputs().Get(id).Header()); } } if (cc->OutputSidePackets().NumEntries() != 0) { for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) { cc->OutputSidePackets().Get(id).Set(cc->InputSidePackets().Get(id)); } } // Sets this packet timestamp offset for Packets going to all outputs. // If you only want to set the offset for a single output stream then // use OutputStream::SetOffset() directly. cc->SetOffset(TimestampDiff(0)); return absl::OkStatus(); } absl::Status Process(CalculatorContext* cc) final { if (cc->Inputs().NumEntries() == 0) { return tool::StatusStop(); } //get node input data mediapipe::Packet _data0 = cc->Inputs().Index(0).Value(); mediapipe::Packet _data1 = cc->Inputs().Index(1).Value(); //not safety. char _tmp_buf[1024]; ::memset(_tmp_buf, 0, 1024); snprintf(_tmp_buf, 1024, _data0.Get std::string _out_data = _tmp_buf; cc->Outputs().Index(0).AddPacket(MakePacket return absl::OkStatus(); } absl::Status Close(CalculatorContext* cc) final { return absl::OkStatus(); } }; REGISTER_CALCULATOR(MyStringProcessCalculator); } namespace mediapipe { absl::Status RunMyGraph() { // Configures a simple graph, which concatenates 2 PassThroughCalculators. CalculatorGraphConfig config = ParseTextProtoOrDie input_stream: "in" input_stream: "customer_in" output_stream: "out" node { calculator: "PassThroughCalculator" input_stream: "in" output_stream: "out1" } node { calculator: "MyStringProcessCalculator" input_stream: "out1" input_stream: "customer_in" output_stream: "out2" } node { calculator: "PassThroughCalculator" input_stream: "out2" output_stream: "out" } )"); LOG(INFO)<<"parse graph cfg-str done ... ..."; CalculatorGraph graph; MP_RETURN_IF_ERROR(graph.Initialize(config)); LOG(INFO)<<"init graph done ... ..."; ASSIGN_OR_RETURN(OutputStreamPoller poller, graph.AddOutputStreamPoller("out")); LOG(INFO)<<"add out-node to output-streampoller done ... ..."; MP_RETURN_IF_ERROR(graph.StartRun({})); LOG(INFO)<<"start run graph done ... ..."; // Give 10 input packets that contains the same std::string "Hello World!". for (int i = 0; i < 10; ++i) { MP_RETURN_IF_ERROR(graph.AddPacketToInputStream( "in", MakePacket MP_RETURN_IF_ERROR(graph.AddPacketToInputStream( "customer_in", MakePacket } // Close the input stream "in". MP_RETURN_IF_ERROR(graph.CloseInputStream("in")); MP_RETURN_IF_ERROR(graph.CloseInputStream("customer_in")); mediapipe::Packet packet; // Get the output packets std::string. while (poller.Next(&packet)) { LOG(INFO) << packet.Get } LOG(INFO)<<"RunGraph Done"; return graph.WaitUntilDone(); } } // namespace mediapipe int main(int argc, char** argv) { gflags::ParseCommandLineFlags(&argc, &argv, true); FLAGS_minloglevel = 0; FLAGS_stderrthreshold = 0; FLAGS_alsologtostderr = 1; google::InitGoogleLogging(argv[0]); LOG(INFO) << "glog init success ... ..."; absl::Status run_status = mediapipe::RunMyGraph(); if (!run_status.ok()) LOG(ERROR) << "Failed to run the graph: " << run_status.message(); google::ShutdownGoogleLogging(); return 0; } |
|
|
|
|
|
下面简单介绍这段代码。
自定义Calculator:MyStringProcessCalculator 这里自定义了一个Calculator,主要作用就是传入snprintf的fmt字符串和fmt字符串所需要的数据。所以可以看到有两个输入,一个是string,一个是我自定义的data-type。输出是一个格式化之后的字符串,所以输出是string。 自定义Calculator主要还是实现4个接口,分别是GetContract,Open,Process,Close。其中GetContract是Graph初始化的时候,检查Calculator用的。Open接口是在Graph开始后,对Calculator做一些初始化工作,例如设定一些Calculator初始状态等。Process是实际的Calculator功能。 namespace mediapipe { class MyStringProcessCalculator : public CalculatorBase { public: /* Calculator authors can specify the expected types of inputs and outputs of a calculator in GetContract(). When a graph is initialized, the framework calls a static method to verify if the packet types of the connected inputs and outputs match the information in this specification. */ static absl::Status GetContract(CalculatorContract* cc) { /* class InputStreamShard; typedef internal::Collection class OutputStreamShard; typedef internal::Collection */ //cc->Inputs().NumEntries() returns the number of input streams // if (!cc->Inputs().TagMap()->SameAs(*cc->Outputs().TagMap())) { // return absl::InvalidArgumentError("Input and output streams's TagMap can't be same."); // } //set stream // for (CollectionItemId id = cc->Inputs().BeginId(); id < cc->Inputs().EndId(); ++id) { // cc->Inputs().Get(id).SetAny(); // cc->Outputs().Get(id).SetSameAs(&cc->Inputs().Get(id)); // } cc->Inputs().Index(0).SetAny(); cc->Inputs().Index(1).Set cc->Outputs().Index(0).SetSameAs(&cc->Inputs().Index(0)); //set stream package // for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) { // cc->InputSidePackets().Get(id).SetAny(); // } // cc->InputSidePackets().Index(0).SetAny(); // cc->InputSidePackets().Index(1).Set if (cc->OutputSidePackets().NumEntries() != 0) { // if (!cc->InputSidePackets().TagMap()->SameAs(*cc->OutputSidePackets().TagMap())) { // return absl::InvalidArgumentError("Input and output side packets's TagMap can't be same."); // } // for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) { // cc->OutputSidePackets().Get(id).SetSameAs(&cc->InputSidePackets().Get(id)); // } cc->OutputSidePackets().Index(0).SetSameAs(&cc->InputSidePackets().Index(0)); } return absl::OkStatus(); } absl::Status Open(CalculatorContext* cc) final { for (CollectionItemId id = cc->Inputs().BeginId();id < cc->Inputs().EndId(); ++id) { if (!cc->Inputs().Get(id).Header().IsEmpty()) { cc->Outputs().Get(id).SetHeader(cc->Inputs().Get(id).Header()); } } if (cc->OutputSidePackets().NumEntries() != 0) { for (CollectionItemId id = cc->InputSidePackets().BeginId(); id < cc->InputSidePackets().EndId(); ++id) { cc->OutputSidePackets().Get(id).Set(cc->InputSidePackets().Get(id)); } } // Sets this packet timestamp offset for Packets going to all outputs. // If you only want to set the offset for a single output stream then // use OutputStream::SetOffset() directly. cc->SetOffset(TimestampDiff(0)); return absl::OkStatus(); } //这里是整个Calculator的核心,就是调用snprintf absl::Status Process(CalculatorContext* cc) final { if (cc->Inputs().NumEntries() == 0) { return tool::StatusStop(); } //get node input data mediapipe::Packet _data0 = cc->Inputs().Index(0).Value(); mediapipe::Packet _data1 = cc->Inputs().Index(1).Value(); //not safety. char _tmp_buf[1024]; ::memset(_tmp_buf, 0, 1024); snprintf(_tmp_buf, 1024, _data0.Get std::string _out_data = _tmp_buf; cc->Outputs().Index(0).AddPacket(MakePacket return absl::OkStatus(); } absl::Status Close(CalculatorContext* cc) final { return absl::OkStatus(); } }; REGISTER_CALCULATOR(MyStringProcessCalculator); } 然后开始编译运行得到结果 编译。 # 注意,这里的--check_visibility=false 为了关闭bazel关于target之间的可见性检查,因为我的Calculator自定义放在我自己的目录的,有一个target对这个目录不可见,编译会报错。 bazel build -c dbg --define MEDIAPIPE_DISABLE_GPU=1 --copt -DMESA_EGL_NO_X11_HEADERS --copt -DEGL_NO_X11 my_target --check_visibility=false --verbose_failures --local_cpu_resources=1 然后运行。得到如下图的结果。 后记 好了,一个超级简单的自定义calculator已经实现了,相信你已经明白了吧。本系列也就此终结吧,以后随缘更新。 |
|
|
|
|
你正在撰写答案
如果你是对答案或其他答案精选点评或询问,请使用“评论”功能。
谁有3566+电池+POE充电的方案,有个项目需要用该功能的主板
1240 浏览 0 评论
RK3588的GMAC0与PHY的参考时钟电平匹配问题??????
6253 浏览 1 评论
请问各位大佬,如何解决,瑞芯微 RV1126B 使用 mpp 自带工具 调试时,内核直接报错崩溃!
2055 浏览 0 评论
使用rk3568开发板,核0\\1\\3运行linux,核2运行hal,在核0中怎么关闭核2
2599 浏览 0 评论
2596 浏览 0 评论
浏览过的版块 |
/9
小黑屋| 手机版| Archiver| 电子发烧友 ( 湘ICP备2023018690号 )
GMT+8, 2025-12-10 18:44 , Processed in 0.708893 second(s), Total 71, Slave 54 queries .
Powered by 电子发烧友网
© 2015 bbs.elecfans.com
关注我们的微信
下载发烧友APP
电子发烧友观察
版权所有 © 湖南华秋数字科技有限公司
电子发烧友 (电路图) 湘公网安备 43011202000918 号 电信与信息服务业务经营许可证:合字B2-20210191

淘帖
1168