Python Ros1 melodic

实验平台 ubuntu18.04

实验目的 掌握python ros1 读取与发送

本次针对字符串消息进行实验

首先需要安装ubuntu 对应的ros1 版本

ubuntu18.04 对应的ros版本是melodic

安装

安装的具体步骤如下:

  • sources.list 添加 ros 源
sudo sh -c 'echo "deb http://packages.ros.org/ros/ubuntu $(lsb_release -sc) main" > /etc/apt/sources.list.d/ros-latest.list'
  • 添加秘钥
 sudo apt-key adv --keyserver 'hkp://keyserver.ubuntu.com:80' --recv-key C1CF6E31E6BADE8868B172B4F42ED6FBAB17C654 
  • 更新源
sudo apt update
  • 安装ROS(melodic)
sudo apt install ros-melodic-desktop-full
  • 安装依赖包
sudo apt install python-rosdep python-rosinstall python-rosinstall-generator python-wstool build-essential
  • 添加到自动启动文件里
echo "source /opt/ros/melodic/setup.bash" >> ~/.bashrc
source ~/.bashrc

ROS 使用的流程

  1. 启动roscore(服务端)
  2. 发送topic
  3. 接收topic

ros基本指令:

启动主节点

roscore

查看话题名称

ros topic list

录制话题数据

rosbag record *topicname

录制所有话题数据

rosbag record -a 

播放bag 包

rosbag play *.bag

Python 代码

实验目的:

学习python 对ros的使用

代码启动ros主节点服务端

代码完成消息的订阅与发送

实验步骤:

python 进程启动 roscore

函数:

def start_ros_core():
    result_roscore = os.system('roscore')
# 使用代码
ros1bag_core_process = multiprocessing.Process(target=start_ros_core)
ros1bag_core_process.start()

发布消息

python 构建发布消息的节点

rospy.init_node("test_pub", anonymous=True))#初始化节点 名称:test_pub

python 构建发布话题的发布者

pub_parking = rospy.Publisher('mystring', String, queue_size=10)

循环每隔一秒发送消息

def sub_mutil():
    num = 0
    while True:
        my_msg = String()
        my_msg.data = f'{num}'
        pub_parking.publish(my_msg)
        time.sleep(1)
        print(f"发送消息:{num}")
        num+=1
sub_mutil()

订阅消息

python 构建接收的节点

rospy.init_node("test_sub", anonymous=True)#初始化节点 名称:test_sub

python 订阅节点 回调函数的定义

def call_back_String(msg):
    print(f"接收到消息: {msg.data}")
rospy.Subscriber('mystring',String, call_back_String)
rospy.spin()

展示:

Python Ros1 melodic

所有代码

发布者代码:

import os
import sys
import time
import rospy
import multiprocessing
from std_msgs.msg import String
def start_ros_core():
    result_roscore = os.system('roscore')
# 使用代码
ros1bag_core_process = multiprocessing.Process(target=start_ros_core)
ros1bag_core_process.start()
rospy.init_node("test_pub", anonymous=True)#初始化节点 名称:test
def sub_mutil():
    num = 0
    while True:
        my_msg = String()
        my_msg.data = f'{num}'
        pub_parking.publish(my_msg)
        time.sleep(1)
        print(f"发送消息:{num}")
        num+=1
#发布话题
pub_parking = rospy.Publisher('mystring', String, queue_size=10)
sub_mutil()

订阅者代码

import rospy
from std_msgs.msg import String
rospy.init_node("test_sub", anonymous=True)#初始化节点 名称:test
def call_back_String(msg):
    print(f"接收到消息: {msg.data}")
rospy.Subscriber('mystring',String, call_back_String)
rospy.spin()
发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章