SMACH 是状态机的意思,是基于Python实现的一个功能强大且易于扩展的库。 smach本质上并不依赖于ROS,可以用于任意Python项目,不过在ROS中元功能包executive_smachsmachROS很好的集成在了一起,可以为机器人复杂应用开发提供任务级的状态机框架,此外元功能包还集成了actionlibsmach_viewer

为避免误导,本文以下提到的SMACH均指ROS中的SMACH功能包。

 

一、关于SMACH

 

1.1 什么时候用

在很多应用场景中,我们需要设计一些复杂的机器人任务,任务中包含多个状态模块,而这些状态模块之间在某些情况下会发生跳转,这就是SMACH可以发挥作用的地方。

    • 快速原型设计:基于Python语法的SMACH可以实现状态机原型的快速开发测试;

    • 复杂状态机模型:SMACH支持设计、维护、调试大型复杂的状态机;

    • 可视化:SMACH提供可视化工具smach_viewer ,可以看到完整状态机的状态跳转、数据流等信息

 

1.2 什么时候不用

在某些场景下,SMACH也并不适用:

    • 非结构化任务:非结构化任务调度中可能存在未知的状态跳转

    • 低层次系统:SMACH适用于任务机调度,不适合相对简单、不包含任务级调度的系统。

    • 拆分模块:SMACH的使用并不是为了让我们将模块拆分

 

二、安装

 

无论是ROS indigo还是kinetic中,都有smach的二进制安装包,可以直接使用如下命令安装:

sudo apt-get install ros-kinetic-executive-smach
sudo apt-get install ros-kinetic-executive-smach-visualization

 

smach提供了不少官方例程源码,可以直接下载运行,不过其中很多例程没有加入内部可视化服务器,所以我对代码进行了一些修改,大家可以下载本博客的修改源码

 

三、状态机跑起来

 

先看一个简单的代码,state_machine_simple_introspection.py

#!/usr/bin/env python

import rospy
import smach
import smach_ros

# define state Foo
class Foo(smach.State):
  def __init__(self):
  smach.State.__init__(self, outcomes=['outcome1','outcome2'])
  self.counter = 0

  def execute(self, userdata):
  rospy.loginfo('Executing state FOO')
  if self.counter < 3:
  self.counter += 1
  return 'outcome1'
  else:
  return 'outcome2'

# define state Bar
class Bar(smach.State):
  def __init__(self):
  smach.State.__init__(self, outcomes=['outcome2'])

  def execute(self, userdata):
  rospy.loginfo('Executing state BAR')
  return 'outcome2'

# main
def main():
  rospy.init_node('smach_example_state_machine')

  # Create a SMACH state machine
  sm = smach.StateMachine(outcomes=['outcome4', 'outcome5'])

  # Open the container
  with sm:
  # Add states to the container
  smach.StateMachine.add('FOO', Foo(), 
  transitions={'outcome1':'BAR', 
  'outcome2':'outcome4'})
  smach.StateMachine.add('BAR', Bar(), 
  transitions={'outcome2':'FOO'})

  # Create and start the introspection server
  sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm, '/SM_ROOT')
  sis.start()

  # Execute SMACH plan
  outcome = sm.execute()

  # Wait for ctrl-c to stop the application
  rospy.spin()
  sis.stop()

if __name__ == '__main__':
  main()

 

使用如下命令运行,先来看下效果

roscore
rosrun smach_tutorials state_machine_simple.py

2

 

在终端中可以看到状态的跳转,但是这样的信息并不是很清晰,我们可以启动一个神器来可视化显示状态机

rosrun smach_viewer smach_viewer.py

1

 

四、代码分

 

通过上边运行的效果你可能还没看明白,接下来我们就对照代码进行分析

作为状态机,首先需要有状态,这个例程中有两个状态:FOOBAR,我们来看一下这两个状态在代码中的定义

# define state Foo
class Foo(smach.State):
  def __init__(self):
    smach.State.__init__(self, outcomes=['outcome1','outcome2'])
    self.counter = 0

  def execute(self, userdata):
    rospy.loginfo('Executing state FOO')
    if self.counter < 3:
      self.counter += 1
      return 'outcome1'
    else:
      return 'outcome2'

# define state Bar
class Bar(smach.State):
  def __init__(self):
    smach.State.__init__(self, outcomes=['outcome2'])

  def execute(self, userdata):
    rospy.loginfo('Executing state BAR')
    return 'outcome2'

 

这两个状态都是通过Python的函数进行定义的,而且结构相似,都包含初始化(__init__)和执行(execute)这两个函数

    1. 初始化函

初始化函数用来初始化该状态类,调用smach中状态的初始化函数,同时需要定义输出状态:outcome1outcome2

这里的outcome代表状态结束时的输出值,使用字符串表示,由用户定义取值的范围,例如我们可以定义状态执行是否成功:['succeeded', 'failed', 'awesome']. 每个状态的输出值可以有多个,根据不同额输出值有可能跳转到不同的下一个状态

需要注意的是初始化函数中不能阻塞,如果需要实现同步等阻塞功能,可以使用多线程实现

    1. 执行函

执行函数就是每个状态中的具体工作内容了,可以进行阻塞工作,当工作后需要返回定义的输出值,该状态结束

 

再来看一下main函数

# main
def main():
  rospy.init_node('smach_example_state_machine')

  # Create a SMACH state machine
  sm = smach.StateMachine(outcomes=['outcome4', 'outcome5'])

  # Open the container
  with sm:
    # Add states to the container
    smach.StateMachine.add('FOO', Foo(), transitions={'outcome1':'BAR', 'outcome2':'outcome4'})
    smach.StateMachine.add('BAR', Bar(), transitions={'outcome2':'FOO'})

  # Create and start the introspection server
  sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm, '/SM_ROOT')
  sis.start()

  # Execute SMACH plan
  outcome = sm.execute()

  # Wait for ctrl-c to stop the application
  rospy.spin()
  sis.stop()

 

main函数中,首先初始化ROS节点,然后就使用StateMachine创建一个状态机,并且指定状态机执行结束后的最终输出值有两个:outcome4outcome5.

SMACH状态机是一个容器,我们可以使用add方法添加需要的状态到状态机容器当中,同时需要设置状态之间的跳转关系


smach.StateMachine.add('FOO', Foo(), transitions={'outcome1':'BAR', 'outcome2':'outcome4'})

 

例如这里我们在状态机中添加一个名为“FOO”的状态,该状态的类就是我们之前定义的Footransitions代表状态的跳转,如果FOO状态执行输出outcome1时,则跳转到“BAR”状态,如果执行输出outcome2时,则结束这个状态机,并且输出outcome4

 

还记得我们上边看到的可视化界面么,为了将状态机可视化显示,我们需要在代码中加入可视化服务器

# Create and start the introspection server
  sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm, '/SM_ROOT')
  sis.start()

 

IntrospectionServer()方法用来创建内部可视化服务器,有三个参数:第一个参数时服务器的名字,可以根据需要自由给定;第二个参数时索要监测的状态机;第三个参数代表状态机的层级,因为SMACH状态机支持嵌套,状态内部还可以有自己的状态机

 

然后就可以使用execute()方法开始执行状态机了,执行结束后需要讲内部可视化服务器停止

3

 

现在再来回顾整个状态机,从图中我们可以看到,状态机开始工作后首先跳入我们添加的第一个状态“FOO”,然后在该状态中累加counter变量,counter小于3时,会输出outcome1,状态结束后就跳转到“BAR”状态。在“BAR”状态中什么都没做,输出outcome2回到“FOO”状态。就这样来回几次之后,counter等于3“FOO”状态的输出值变成outcome2,继而跳转到outcome4,也就代表着有限状态机运行结束。outcome5全程并没有涉及到,所以在图上成为了一个孤立的节点

 

ROS中的SMACH状态机是不是也并不复杂,将上边的状态机想象成一个简单的机器人应用:机器人去抓取桌子上的杯子,如果抓取到就结束任务,如果抓取不到就继续尝试,尝试3次还没抓到,就放弃抓取,结束任务

SMACH的功能远远不止如此,比如这是一个复杂的状态机。。。

5 

参考资料

http://wiki.ros.org/smach/Tutorials

http://wiki.ros.org/smach/Tutorials/Smach%20Viewer

http://www.cnblogs.com/cv-pr/p/5155828.html