四足机器人高算力、低成本主控第三步:共享内存实现多核运行

1. 多任务的机器人控制软件框架

在单片机中我们可以使用计时器中断、软件调度或者直接使用嵌入式操作系统如FreeRTOS或UCOSII的方式来实现不同控制任务的定时运行,由于单片机是单核系统因此就算构建了多线程也仅仅是在某个线程sleep时主动将资源释放而言,而在树莓派中其有多个CPU能实现多核心运行,而且当很多高速运动控制任务与单一CPU内核绑定时其运行的速度和实时性都远远优于单片机的操作系统调度,如下图MOCO-ML机器人控制系统中主要的几个任务其均运行在不同的频率因此我们可以采用Linux多线程或多核多任务的方式来实现,从而提高力控带宽到1Khz满足绝大部分机器人的运动控制频率需求:

STM32独立运行时的操作系统线程

如上图所示,越往左的模块其对计算的实时性要求越高,因此我们可以把很多同类调度的软件模块综合在一个大任务中保证其采用单独的CPU运行,但这种方法相比多线程来说任务间的数据交互是一个大问题,在多线程框架中最直接的方式我们可以定义全局变量来完成数据的函数数据的交互,而在多任务框架下每个任务都在不同的内核上运行,数据交互的实时性和数据量都有很严格的要求才能保证机器人的运行。以MIT的四足机器人软件为例其采用LCM来实现进程间的数据通信,LCM(Lightweight Communications and Marshalling,轻量级通信与数据封送库)是一组类库含多种语言如java,c等专门针对实时系统在高带宽和低的延迟的情况下进行消息发送和数据封送处理。它提供了一个发布/订阅消息模型、自动封装/解封代码生成工具含多种编程语言版本。其最初由MIT城市挑战赛小组为DARPA消息传递系统设计。LCM是专为通过局域网连接的tightly-coupled类型系统而设计。 它不适合因特网。LCM研制开发软实时系统它默认允许丢包以减少延时。

LCM也早已经被广泛应用于自动驾驶中,可以说国外的软件框架发展还是更加完善,很多必要的软件模块均已有成熟的库和相应的包可以依赖,这也是MIT这样大学技术实力最核心的体现。因此考虑机器人的软硬件控制机制,我们可以把单片机中的控制框架重新修改一下以多进程和多线程的结构来重新组合。

首先我们可以将机器人主要功能划分为3个层次,第一个是底层伺服驱动:该部分主要是树莓派与单片机的SPI通讯,由于要保证高速数据传输,因此需要单独分配一个进程,同时增加相应保护;第二层是运动控制层:该部分主要完成对机器人运动状态的估计和反馈控制;第三层是导航规划层:该部分主要完成机器人的路径规划和遥控命令处理,另外很多全局保护也可以放在本层中,则在树莓派中机器人的软件框架如下:

树莓派下多核心架构

2. 共享内存与数据交互

基于LCM能实现数据的轻量化数据的实时交互,但是需要而外安装相应的库来支撑,在Linux系统中另外可以通过共享内存的方式来实现数据交互,共享内存是进程间通信中最简单的方式之一。共享内存允许两个或更多进程访问同一块内存,就如同 malloc() 函数向不同进程返回了指向同一个物理内存区域的指针。当一个进程改变了这块地址中的内容的时候,其它进程都会察觉到这个更改。

因此我参考网上很多共享内存的例子来实现一个自己的进程间通信接口,首先我们构建伺服层servo_task的共享内存接口

本层中主要首先使用SPI接口读取单片机回传的编码器数据和IMU等运动传感器数据,进一步我们开辟一片简单的内存将其按浮动数的协议差分为4个字节依次存入其中,首先建立写入内存初始化:

struct shareMemory 
{  
    int  flag=0;  //作为一个标志,非0:表示可读,0表示可写
    unsigned char szMsg[MEM_SIZE];  
};  
struct shareMemory shareMemory_spi;

结构体重申请了MEM_SIZE个Char型的变量内存,之后在初始化中添加下面的代码其中MEM_SPI为内存ID,读取和写入需要一致:

  //共享内存
    int shmid_rx = shmget((key_t)MEM_SPI, sizeof(shareMemory_spi), 0666|IPC_CREAT); //失败返回-1,假设成功。
    //0666表示权限,与文件一样。如0644,它表示允许一个进程创建的共享内存被内存创建者所拥有的进程向共享内存读取和写入数据,同时其他用户创建的进程只能读取共享内存。
    void *shm_rx = shmat(shmid_rx, (void*)0, 0);  //失败返回-1,假设成功 
    shareMemory *pshm_rx = (shareMemory*)shm_rx;  
    pshm_rx->flag = 0; 
    printf("Memory SPI attached at %p\n",shm_rx);

在主循环中读取完SPI数据之后将其写入内存中,并改变数据刷新编制位:

//共享内存写
        if(pshm_rx->flag == 0)  
        {  
            if(!mem_connect){
            printf("MEM SPI CONNECT!!!\n");
            mem_connect=1;
            }
            mem_loss_cnt=0;
            memory_write();
            for(int k=0;k<MEM_SIZE/2-1;k++)
                pshm_rx->szMsg[k]=mem_write_buf[k]; 
            for(int k=MEM_SIZE/2;k<MEM_SIZE;k++)
                mem_read_buf[k]=pshm_rx->szMsg[k];
            memory_read();
            pshm_rx->flag = 1;  
        }else
            mem_loss_cnt+=sys_dt;

写入中首先将读取传感器数据以浮点数依次存入对于的缓存数组write_buf:

void memory_write(void)//写入内存
{
    static float temp=0;
    mem_write_cnt=0;
    setDataFloat_mem( spi_rx.att[0]);
    setDataFloat_mem( spi_rx.att[1]);
    setDataFloat_mem( spi_rx.att[2]);
    setDataFloat_mem( spi_rx.att_rate[0]);
    setDataFloat_mem( spi_rx.att_rate[1]);
    setDataFloat_mem( spi_rx.att_rate[2]);
    setDataFloat_mem( spi_rx.acc_b[0]);
    setDataFloat_mem( spi_rx.acc_b[1]);
    setDataFloat_mem( spi_rx.acc_b[2]);
    for (int i = 0; i < 4; i++)
    {
    setDataFloat_mem(spi_rx.q[i][0]);
    setDataFloat_mem(spi_rx.q[i][1]);
    setDataFloat_mem(spi_rx.tau[i][0]);
    setDataFloat_mem(spi_rx.tau[i][1]);
    }
}

之后将其赋值给共享内存中申请的数组,这样另一个进程在判断标志位变换后就可以读出,注意这里参考SPI同步读写的想法,将数组前半部分用于写入,后半部分读取另一个进程的数据,采用读取函数实际就是将内存中的数据重新组装成浮点数:

void memory_read(void){//读取内存
int mem_read_cnt=MEM_SIZE/2;
float test1,test2;
for (int i = 0; i < 4; i++)
{
    spi_tx.q_set[i][0] = floatFromData_spi(mem_read_buf, &mem_read_cnt);
    spi_tx.q_set[i][1] = floatFromData_spi(mem_read_buf, &mem_read_cnt);
    spi_tx.q_reset[i][0] = floatFromData_spi(mem_read_buf, &mem_read_cnt);
    spi_tx.q_reset[i][1] = floatFromData_spi(mem_read_buf, &mem_read_cnt);
    spi_tx.tau_ff[i][0] = floatFromData_spi(mem_read_buf, &mem_read_cnt);
    spi_tx.tau_ff[i][1] = floatFromData_spi(mem_read_buf, &mem_read_cnt);
}
spi_tx.max_i= floatFromData_spi(mem_read_buf, &mem_read_cnt);
spi_tx.kp= floatFromData_spi(mem_read_buf, &mem_read_cnt);
spi_tx.ki= floatFromData_spi(mem_read_buf, &mem_read_cnt);
spi_tx.kd= floatFromData_spi(mem_read_buf, &mem_read_cnt);
spi_tx.en_motor= mem_read_buf[mem_read_cnt++];
spi_tx.reser_q= mem_read_buf[mem_read_cnt++];
spi_tx.reset_err= mem_read_buf[mem_read_cnt++];
}

因此,在每次SPI读取完后将数据写入内存中并解析内存后半部分来自别的进程的数据,最后将标志位赋1;

对于接受进程比如我们的control_task,首先需要构建一个与发送同样的结构体:

struct shareMemory 
{  
    int  flag=0;  //作为一个标志,非0:表示可读,0表示可写
    unsigned char szMsg[MEM_SIZE];  
};  
struct shareMemory shareMemory_spi;

初始化部分稍微不同,需要保证MEM_SPI访问的内存ID一致:

    int shmid_rx = shmget((key_t)MEM_SPI, sizeof(shareMemory_spi), 0666|IPC_CREAT); //失败返回-1,假设成功  
    void *shm_rx = shmat(shmid_rx, 0, 0);  
    shareMemory *pshm_rx = (shareMemory*)shm_rx;  
    printf("Memory SPI attached at %p\n",shm_rx);

之后再主函数中判断标志位为1即数据刷新后开始读取,因此其读写顺序与发送相反,先读取后写入再将标志位赋0,从而完成一次同步读写:

 if(pshm_rx->flag == 1)  
        {  

            if(!mem_connect){
            printf("MEM SPI CONNECT!!!\n");
            mem_connect=1;
            }
            mem_loss_cnt=0;

            for(int k=0;k<MEM_SIZE/2-1;k++)
                mem_read_buf[k]=pshm_rx->szMsg[k];
            memory_read();
            memory_write();
            for(int k=MEM_SIZE/2;k<MEM_SIZE;k++)
                pshm_rx->szMsg[k]=mem_write_buf[k];     
            
            pshm_rx->flag = 0;  
        }  

这样当两个任务都跑起来时就可以通过共享内存的方式来实现数据互传,当然我实现的方式比较粗糙类似传统应答的模式,但能力有限先保证两个部分功能能够实现,当在两个不同终端中启动servo和control程序则可以看到数据互通成功,通讯频率能保证与SPI接收速率批量,下图中servo节点传输了SPI读取STM32计算的姿态角到control节点,并将其打印:

上述方法仅以本人个人理解实现共享内存数据互传,当然很明显其存在很多问题比如:

(1)有采用应答机制因此在SPI收发完成后才能实现一次数据同步,因此内存数据刷新受SPI通讯频率影响;

(2)内存同步无法实现浮点数的传输,需要自己重新组码和解码;

(3)内存同步仅支持1对1收发;