使用ROS查询KB时的分割故障

发布于 2025-02-13 05:34:43 字数 3753 浏览 2 评论 0原文

我构建了一个ros软件包来运行此脚本,称为planner_node.py与prologswoolbles Boasen(KB)接口:

#! /bin/env python3

from pyswip import Prolog
import rospy
from std_msgs.msg import String

class planner:

    def __init__(self, scene_file="src/mmi/scenes/test.pl"):

        self.scene_file = scene_file
        self.data = None
        self.prolog = Prolog()
        self.prolog.consult(self.scene_file)
        self.init_ros()

    def init_ros(self):

        rospy.init_node("planner_node")
        self.rate = rospy.Rate(10)
       
        self.what_next = rospy.Publisher("what_next_topic", String, queue_size=10)
        self.check_sat = rospy.Publisher("check_sat_topic", String, queue_size=10)
        rospy.Subscriber('check_sat_topic', String, self.proceed_next_action)
       

    def get_plan_status(self):
        plan_done = list(self.prolog.query("plan_done(A)."))[0]['A']
        if plan_done == 'true':
            result = True
        else:
            result = False
        return result

    def proceed_next_action(self, data):
        self.get_next_action()
    
    # publishes on what_next_topic
    def get_next_action(self):
        if self.get_plan_status():
            self.what_next.publish(self.curr_action_id)
            return
           
        ####### get the next action
        next_actions = list(self.prolog.query("pending_actions(Action_List)."))
        act = next_actions[0]
        act_list = act['Action_List']
        self.curr_action_id = act_list[0]

        self.prolog = self.apply_action(self.prolog, self.curr_action_id)

        self.data = self.curr_action_id
       
    def list_to_string(self, list1):
        if list1 == []:
            return "[]"
        result = "["
        for elt in list1:
            result += str(elt)
            result += ","

        return result[:-1] + "]"

    def apply_action(self, prolog, curr_action_id):
        """
        updates the data in the prolog file self.file_path
        """

        print("applying "+str(curr_action_id)+"  ... ")
       
        ############
        all_applied_acts = list(prolog.query("all_applied_actions(A)."))[0]['A']
        all_applied_actions = "all_applied_actions("+self.list_to_string(all_applied_acts)+")"
        prolog.retract(all_applied_actions)

        all_applied_acts.insert(0,curr_action_id)
        all_applied_actions = "all_applied_actions("+self.list_to_string(all_applied_acts)+")"
        prolog.assertz(all_applied_actions)
        ############
        pending = list(prolog.query("pending_actions(A)."))[0]['A']
        pending_actions = "pending_actions("+self.list_to_string(pending)+")"
        prolog.retract(pending_actions)

        pending.remove(curr_action_id)
        pending_actions = "pending_actions("+self.list_to_string(pending)+")"
        prolog.assertz(pending_actions)
        ############
        if pending == []:
            prolog.assertz('plan_done(true)')
            prolog.retract('plan_done(false)')
        ############

        return prolog

## Main
if __name__ == '__main__':

    planner1 = planner()
    planner1.get_next_action()   

    while not rospy.is_shutdown():
        planner1.what_next.publish(planner1.data)
        planner1.check_sat.publish("yes")

这是知识库test.pl :

:- dynamic(all_applied_actions/1).
:- dynamic(pending_actions/1).
:- dynamic(plan_done/1).

% % % % % % % % % % % % % % % %

all_applied_actions([]).
pending_actions([2,3,4,6,7,8,10,11,12,14]).

plan_done(false).

% % % % % % % % % % % % % % % %

我会收到此错误:分割故障(Core丢弃)在运行此rosnode时。 它可能是由序言查询引起的,但我不知道问题在哪里,也不知道如何解决。

您能告诉我如何解决此细分错误吗?

I built a ROS Package to run this script called planner_node.py to interface with Prolog Knowledge Base(KB):

#! /bin/env python3

from pyswip import Prolog
import rospy
from std_msgs.msg import String

class planner:

    def __init__(self, scene_file="src/mmi/scenes/test.pl"):

        self.scene_file = scene_file
        self.data = None
        self.prolog = Prolog()
        self.prolog.consult(self.scene_file)
        self.init_ros()

    def init_ros(self):

        rospy.init_node("planner_node")
        self.rate = rospy.Rate(10)
       
        self.what_next = rospy.Publisher("what_next_topic", String, queue_size=10)
        self.check_sat = rospy.Publisher("check_sat_topic", String, queue_size=10)
        rospy.Subscriber('check_sat_topic', String, self.proceed_next_action)
       

    def get_plan_status(self):
        plan_done = list(self.prolog.query("plan_done(A)."))[0]['A']
        if plan_done == 'true':
            result = True
        else:
            result = False
        return result

    def proceed_next_action(self, data):
        self.get_next_action()
    
    # publishes on what_next_topic
    def get_next_action(self):
        if self.get_plan_status():
            self.what_next.publish(self.curr_action_id)
            return
           
        ####### get the next action
        next_actions = list(self.prolog.query("pending_actions(Action_List)."))
        act = next_actions[0]
        act_list = act['Action_List']
        self.curr_action_id = act_list[0]

        self.prolog = self.apply_action(self.prolog, self.curr_action_id)

        self.data = self.curr_action_id
       
    def list_to_string(self, list1):
        if list1 == []:
            return "[]"
        result = "["
        for elt in list1:
            result += str(elt)
            result += ","

        return result[:-1] + "]"

    def apply_action(self, prolog, curr_action_id):
        """
        updates the data in the prolog file self.file_path
        """

        print("applying "+str(curr_action_id)+"  ... ")
       
        ############
        all_applied_acts = list(prolog.query("all_applied_actions(A)."))[0]['A']
        all_applied_actions = "all_applied_actions("+self.list_to_string(all_applied_acts)+")"
        prolog.retract(all_applied_actions)

        all_applied_acts.insert(0,curr_action_id)
        all_applied_actions = "all_applied_actions("+self.list_to_string(all_applied_acts)+")"
        prolog.assertz(all_applied_actions)
        ############
        pending = list(prolog.query("pending_actions(A)."))[0]['A']
        pending_actions = "pending_actions("+self.list_to_string(pending)+")"
        prolog.retract(pending_actions)

        pending.remove(curr_action_id)
        pending_actions = "pending_actions("+self.list_to_string(pending)+")"
        prolog.assertz(pending_actions)
        ############
        if pending == []:
            prolog.assertz('plan_done(true)')
            prolog.retract('plan_done(false)')
        ############

        return prolog

## Main
if __name__ == '__main__':

    planner1 = planner()
    planner1.get_next_action()   

    while not rospy.is_shutdown():
        planner1.what_next.publish(planner1.data)
        planner1.check_sat.publish("yes")

Here is the Knowledge Base test.pl:

:- dynamic(all_applied_actions/1).
:- dynamic(pending_actions/1).
:- dynamic(plan_done/1).

% % % % % % % % % % % % % % % %

all_applied_actions([]).
pending_actions([2,3,4,6,7,8,10,11,12,14]).

plan_done(false).

% % % % % % % % % % % % % % % %

I get this Error: Segmentation fault (core dumped) while running this rosnode.
It is probably caused by the prolog queries but I do not know where is the issue nor how to fix it.

Can you please tell me how to resolve this segmentation fault please?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

心舞飞扬 2025-02-20 05:34:43

至少有一个问题是您的节点旋转的速度。您创建一个速率变量,但实际上永远不会使用它。这将导致出版物尽可能快地发送出去。这也将创建一个非常快速的反馈循环,以check_sat_topic主题。您的最后一行应该看起来像这样:

while not rospy.is_shutdown():
    planner1.what_next.publish(planner1.data)
    planner1.check_sat.publish("yes")
    self.rate.sleep()

At least one issue y out have here is how fast your node is spinning. You create a rate variable but you never actually use it. This will cause publications to be sent out as fast as possible. This also will create a very fast feedback loop to the check_sat_topic topic. Your last lines should look like this instead:

while not rospy.is_shutdown():
    planner1.what_next.publish(planner1.data)
    planner1.check_sat.publish("yes")
    self.rate.sleep()
或十年 2025-02-20 05:34:43

我通过使用 rosprolog 而不是pyswip来解决问题。

这是示例

#!/usr/bin/env python

import roslib; roslib.load_manifest('rosprolog')

import rospy
from rosprolog_client import PrologException, Prolog

if __name__ == '__main__':
    rospy.init_node('example_query_prolog_kb')
    prolog = Prolog()
    query = prolog.query("perishable(X), location(X,Y,_)")
    for solution in query.solutions():
        print 'Foenter code hereund solution. Product: %s, Location: %s' % (solution['X'], solution['Y'])
    query.finish()

I resolved the issue by using rosprolog instead of pyswip.

Here is an example

#!/usr/bin/env python

import roslib; roslib.load_manifest('rosprolog')

import rospy
from rosprolog_client import PrologException, Prolog

if __name__ == '__main__':
    rospy.init_node('example_query_prolog_kb')
    prolog = Prolog()
    query = prolog.query("perishable(X), location(X,Y,_)")
    for solution in query.solutions():
        print 'Foenter code hereund solution. Product: %s, Location: %s' % (solution['X'], solution['Y'])
    query.finish()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文