在嵌入式系统开发中,大型项目往往采用微服务架构来构建,其核心思想是将一个庞大的单体应用分割成一系列小型、独立、松耦合的服务模块,这些模块可以是以线程或进程形式存在的多个服务单元。各服务间为了协同工作,不可避免地需要进行进程间通信(IPC, Inter-Process Communication)。
已有的IPC方案众多,包括但不限于信号、管道、消息队列和Socket通信等。此前也分享过系列文章,详细介绍过这些方案的使用方式(可以在公众号聊天界面获取历史文章目录)。不过,大多数传统IPC方案主要侧重于单向数据传递,对于服务调用后的同步返回值处理并未提供直接的支持。
鉴于此,本文参照Android平台中的Binder机制,设计并实现了一套具备同步返回值功能的RPC(Remote Procedure Call,远程过程调用)方案。这套方案汲取了Binder的优点,能够有效地在进程间进行服务调用并同步接收返回结果,解决了传统IPC方案在双向通信方面的局限性,提升了嵌入式应用中服务间通信的效率和灵活性。
首先,对于RPC的实现要求,数据传输的顺序必须按照接口传入参数的顺序依次传输。调用者和被调用者保持相同的内存偏移同步写入和读取,确保数据不乱套。
为什么选用共享内存,而非其他的IPC方案?
为什么采用环形缓冲区?
我们的目的是实现进程间接口的远程调用,外部的需求主要两点:1.参数传递 2. 结果同步返回。
基于此,大致时序如下:
共享环形缓冲区时序图
首先约定:服务端与客户端各创建一片共享内存和信号量。同时持有彼此的共享内存和信号量。(方便调试的做法,实际项目应该统一管理分配)
编程环境
接口定义
struct Root
{
uint8_t work; // 使能状态
uint8_t busy; // 忙碌状态
uint8_t rwStatus; // 可读状态
uint32_t wp; // 写入位置
uint32_t rp; // 读取位置
};
enum ECmdType
{
CMD_WRITEABLE = 0x01,
CMD_READABLE = 0x02,
CMD_BUTT,
};
class SharedRingBuffer
{
public:
SharedRingBuffer(std::string path, uint32_t capacity);
~SharedRingBuffer();
bool IsReadable() const noexcept;
bool IsWriteable() const noexcept;
int write(const void* data, uint32_t len);
int read(void* data, uint32_t len);
private:
uint32_t AvailSpace() const noexcept;
uint32_t AvailData() const noexcept;
void SetRWStatus(ECmdType type) const noexcept;
void DumpMemory(const char* pAddr, uint32_t size);
void DumpErrorInfo();
private:
Root* mRoot;
void* mData;
uint32_t mCapacity;
std::mutex mMutex;
std::string mShmPath;
};
SharedRingBuffer对外仅暴露四个接口,主要用于数据的检查和读写。
class Parcel
{
public:
Parcel(std::string path, int key, bool master);
~Parcel();
int WriteBool(bool value);
int ReadBool(bool& value);
int WriteInt(int value);
int ReadInt(int& value);
int WriteString(const std::string& value);
int ReadString(std::string& value);
int WriteData(void* data, int size);
int ReadData(void* data, int& size);
int wait();
int post();
private:
bool mMaster;
int mShmKey;
sem_t* mSem ;
std::string mShmPath;
SharedRingBuffer* mRingBuffer;
};
Parcel持有共享环形缓冲区和信号量,负责数据的封装。对外提供各种数据类型的写入和读取,同时提供数据同步机制接口wait(),post() 。
关键接口实现 篇幅有限,文章仅列举关键实现接口(完整代码可在聊天界面输入标题获取)
int SharedRingBuffer::write(const void* data, uint32_t len) {
int ret = -1;
int retry = RETRY_TIMES;
// It's hard to believe, but it actually happened:
// Although post after it is written in the shared memory, synchronization still might not be timely,
// and the AvailSpace() returns 0. Only add a retry to avoid it
while (retry > 0) {
std::lock_guard<std::mutex> lock(mMutex);
int32_t avail = AvailSpace();
if (avail >= len) {
memcpy(static_cast<char*>(mData) + mRoot->wp, data, len);
mRoot->wp = (mRoot->wp + len) % mCapacity;
SetRWStatus(CMD_READABLE);
ret = 0;
break;
} else {
SPR_LOGE("AvailSpace invalid! avail = %d\n", avail);
DumpErrorInfo();
retry--;
usleep(RETRY_INTERVAL_US);
}
}
return ret;
}
write 接口实现的是将数据写入共享内存,并同步写入偏移量和相关状态。这里加了失败重试机制和一些线程同步。
int SharedRingBuffer::read(void* data, uint32_t len)
{
int ret = -1;
int retry = RETRY_TIMES;
// Refer to write comments
while (retry > 0) {
std::lock_guard<std::mutex> lock(mMutex);
int32_t avail = AvailData();
if (avail >= len) {
memcpy(data, static_cast<char*>(mData) + mRoot->rp, len);
mRoot->rp = (mRoot->rp + len) % mCapacity;
SetRWStatus(CMD_WRITEABLE);
ret = 0;
break;
} else {
SPR_LOGE("AvailData invalid! avail = %d, len = %d\n", avail, len);
DumpErrorInfo();
retry--;
usleep(RETRY_INTERVAL_US);
}
}
return ret;
}
read 接口实现的是将数据从共享内存读取出。大致流程与write一致。
实现一个简单的例子,客户端远程调用服务端的一个接口 CalculateSum(int val1, int val2)
static int CalculateSum(int val1, int val2)
{
return val1 + val2;
}
void ServerHandleRequest(Parcel& req, Parcel& reply)
{
int cmd;
req.ReadInt(cmd);
switch (cmd)
{
case PARCEL_CMD_CACULATE_SUM:
{
int val1 = 0;
int val2 = 0;
req.ReadInt(val1);
req.ReadInt(val2);
int sum = CalculateSum(val1, val2);
reply.WriteInt(sum);
break;
}
default:
SPR_LOGE("Invaild Cmd(0x%x)!\n", cmd);
break;
}
reply.post();
}
int main()
{
Parcel replyParcel("client_rpc", 88888, false);
Parcel reqParcel("server_rpc", 12345, true);
while (true)
{
reqParcel.wait();
ServerHandleRequest(reqParcel, replyParcel);
}
return 0;
}
Parcel reqParcel("server_rpc", 12345, false);
Parcel replyParcel("client_rpc", 88888, true);
int CalculateSum(int val1, int val2)
{
int sum = 0;
reqParcel.WriteInt(PARCEL_CMD_CACULATE_SUM);
reqParcel.WriteInt(val1);
reqParcel.WriteInt(val2);
reqParcel.post();
replyParcel.wait();
replyParcel.ReadInt(sum);
return sum;
}
int main() {
char in = 0;
do {
SPR_LOGD("Input: ");
scanf("%c", &in);
getchar();
switch (in)
{
case '3':
{
int val1 = 0;
int val2 = 0;
SPR_LOGD("Input val1 val2: ");
scanf("%d %d", &val1, &val2);
getchar();
int sum = CalculateSum(val1, val2);
SPR_LOGD("sum = %d\n", sum);
break;
}
default:
break;
}
} while (in != 'q');
return 0;
}
Client D: Input val1 val2: 11 22
Client D: sum = 33
Client D: Input val1 val2: 10 10
Client D: sum = 20
用心感悟,认真记录,写好每一篇文章,分享每一框干货。