import rclpy
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
import threading
from queue import Queue
from geometry_msgs.msg import Pose
from .path_planner import PathPlanner
from .vs_assembler import Assembler
from .vs_controller import ControllerService
from .task_publisher import TaskPublisher
from cordyceps_interfaces.srv import CustomPathPlanner, CustomRobotAssembler, Controller, CheckThread
from cordyceps_interfaces.msg import RobotRoutes, Task
[docs]class VsManager(Node):
def __init__(self):
"""Constructor for the VsManager class. Initializes the ROS2 node and creates the service."""
super().__init__('vs_manager')
self.declare_parameters(
namespace='',
parameters=[
('number_of_robots', rclpy.Parameter.Type.INTEGER),
('diameter', rclpy.Parameter.Type.DOUBLE),
('path_filename', rclpy.Parameter.Type.STRING),
]
)
self.number_of_robots = self.get_parameter('number_of_robots').value
self.diameter = self.get_parameter('diameter').value
self.path_filename = self.get_parameter('path_filename').value
self.task_queue = Queue(1)
self.task_thread = threading.Thread(target=self.task_executor)
self.task_thread.start()
self.robot_route_client = self.create_client(CustomPathPlanner, 'get_robot_routes')
self.assembler_client = self.create_client(CustomRobotAssembler, 'get_robot_vs_ref_pose')
self.start_route_follow_client = self.create_client(Controller, 'start_follow_route')
self.check_thread_state_client = self.create_client(CheckThread, 'check_thread_state')
self.task_subscriber = self.create_subscription(Task, 'vs_manager/task', self.task_callback, 10)
[docs] def get_parameters(self):
return self.number_of_robots, self.diameter, self.path_filename
[docs] def task_callback(self, msg:Task):
"""Callback function for the task subscriber. Adds the task to the task queue.
:param Task msg: The task message that is received from the task topic."""
self.task_queue.put(msg)
[docs] def task_executor(self):
"""Executes the tasks in the task queue."""
while True:
task = self.task_queue.get(block=True)
vs_ref_pose = self.request_transformed_bot_poses(task)
routes = self.request_routes(task, vs_ref_pose)
self.controll_vs(routes)
[docs] def request_routes(self, task:Task, vs_ref_pose:list):
"""Requests the routes from the path planner service.
:param Task task: The task for which the routes are requested.
:param list vs_ref_pose: The reference poses for each robot.
:returns: The routes for each robot."""
req = CustomPathPlanner.Request()
req.task = task
req.vs_ref_pose = vs_ref_pose
while True:
if self.robot_route_client.wait_for_service():
break
respone = self.robot_route_client.call(req)
return respone.robot_routes
[docs] def controll_vs(self, routes: RobotRoutes):
"""Calls the controller service to start the path following.
:param RobotPaths paths: The paths for each robot.
"""
request = Controller.Request()
request.robot_routes = routes
self.start_route_follow_client.wait_for_service()
self.start_route_follow_client.call(request)
is_alive = True
request = CheckThread.Request()
while is_alive:
is_alive = self.check_thread_state_client.call(request).is_alive
[docs]def main(args=None):
rclpy.init(args=args)
vs_manager = VsManager()
controller = ControllerService()
planner = PathPlanner(vs_manager.path_filename)
assembler = Assembler()
task_publisher = TaskPublisher(vs_manager.number_of_robots, vs_manager.diameter)
executor = MultiThreadedExecutor()
executor.add_node(vs_manager)
executor.add_node(controller)
executor.add_node(planner)
executor.add_node(assembler)
executor.add_node(task_publisher)
executor.spin()
vs_manager.destroy_node()
controller.destroy_node()
planner.destroy_node()
assembler.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()