DDS DataReader 缓存损坏且无法再访问

发布于 2024-10-01 00:49:27 字数 6160 浏览 4 评论 0原文

在 i386 上使用 dds 库进行操作,尝试重复提取样本。我明确地“读取”样本,而不是“获取”样本,因此它们永远不会过期或被删除。

  • 启动两个黑板应用程序,(1) 和 (2)
  • 在两个应用程序中执行读取。这将返回“缓存为空”。
  • 从 (1) 写入,传感器 id: 1,事件 id: 1,值:1。
  • 从 (1) 读取,确认值
  • 从 (2) 读取,确认值
  • 从 (2) 写入,传感器 id: 1,事件 id: 1,值:2。
  • 从(2)读取,“缓存为空”
  • 从(1)读取,“缓存为空”

看来我“破坏”了它!我相信样本的寿命应该是无穷大(或者说我已经理解了......但无法确认!) - 但我无法明确设置它。 topicQos.lifespan.duration 的类型为 Duration_t,但我无法将其设置为“new Duration_t(Duration_t.DURATION_INFINITY_SEC,Duration_t.DURATION_INFINITY_NSEC)””因为它已经敲定了?

public class Main {

  private static final String EVENT_TOPIC_NAME = "EVENTS";
  private static BufferedReader in = null;
  private static PrintStream out = null;
  /**
   * @param args the command line arguments
   */
  public static void main(String[] args) throws IOException {
    in = new BufferedReader(new InputStreamReader(System.in));
    out = new PrintStream(new BufferedOutputStream(System.out));

    DomainParticipantFactory factory = DomainParticipantFactory.TheParticipantFactory;
    DomainParticipant participant = factory.create_participant(100,
            DomainParticipantFactory.PARTICIPANT_QOS_DEFAULT,
            null,
            StatusKind.STATUS_MASK_NONE);

    EventTypeSupport.register_type(participant, EventTypeSupport.get_type_name());

    TopicQos topicQos = new TopicQos();
    topicQos.durability.direct_communication = true;
    topicQos.durability.kind = DurabilityQosPolicyKind.TRANSIENT_DURABILITY_QOS;
    topicQos.reliability.kind = ReliabilityQosPolicyKind.RELIABLE_RELIABILITY_QOS;
    topicQos.resource_limits.max_instances = 100;
    topicQos.resource_limits.max_samples = 100;
    topicQos.resource_limits.max_samples_per_instance = 1;
    topicQos.ownership.kind = OwnershipQosPolicyKind.SHARED_OWNERSHIP_QOS;
    topicQos.history.kind = HistoryQosPolicyKind.KEEP_LAST_HISTORY_QOS;
    topicQos.history.depth = 1;
    topicQos.history.refilter = RefilterQosPolicyKind.ALL_REFILTER_QOS;
    // Since this is on the same computer, and being typed by a human, we can exepct source timestamps to be useful in ordering
    topicQos.destination_order.kind = DestinationOrderQosPolicyKind.BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;

    Topic topic =
            participant.create_topic(EVENT_TOPIC_NAME,
            EventTypeSupport.get_type_name(),
            topicQos,
            new EventTopicListener(),
            StatusKind.STATUS_MASK_ALL);
    exitIfNullBecause(topic, "Could not create topic");

    Subscriber subscriber = participant.create_subscriber(DomainParticipant.SUBSCRIBER_QOS_DEFAULT,
            null,
            StatusKind.STATUS_MASK_NONE);
    exitIfNullBecause(subscriber, "Could not create subscriber");

    DataReader reader = subscriber.create_datareader(participant.lookup_topicdescription(EVENT_TOPIC_NAME),
            subscriber.DATAREADER_QOS_USE_TOPIC_QOS,
            null,
            StatusKind.STATUS_MASK_NONE);
    exitIfNullBecause(reader, "Could not create reader");
    EventDataReader eventReader = (EventDataReader) reader;

    Publisher publisher = participant.create_publisher(DomainParticipant.PUBLISHER_QOS_DEFAULT,
            null,
            StatusKind.STATUS_MASK_NONE);
    exitIfNullBecause(publisher, "Could not create publisher");

    DataWriter writer = publisher.create_datawriter(topic,
            publisher.DATAWRITER_QOS_USE_TOPIC_QOS,
            null,
            StatusKind.STATUS_MASK_NONE);
    exitIfNullBecause(writer, "Could not create writer");
    EventDataWriter eventWriter = (EventDataWriter)writer;

    boolean loop = true;
    byte inputBuffer[] = new byte[1024];
    String command;
    while(loop){
      print("Enter action [read|write|exit]: ");

      command = in.readLine();
      if(command.startsWith("r")){
        dumpCache(eventReader);
      } else if(command.startsWith("w")) {
        writeCache(eventWriter);
      } else if(command.startsWith("e")){
        println("exiting...");
        System.exit(0);
      } else {
        println("Unknown: '" + command + "'");
      }
    }

    System.exit(0);
  }

  private static void print(String output){
    out.print(output);
    out.flush();
  }
  private static void println(String output){
    out.println(output);
    out.flush();
  }

  private static void exitIfNullBecause(Object thing, String string) {
    if (thing == null) {
      println("ERROR: " + string);
      System.exit(1);
    }
  }
  private static void dumpCache(EventDataReader eventReader) {
    // Something interesting here: I can creat it with a collection as a paramter. TODO: Investigate!
    EventSeq eventSeq = new EventSeq();
    SampleInfoSeq infoSeq = new SampleInfoSeq();

    Event event = null;
    SampleInfo info = null;

    try{
      eventReader.read(eventSeq, infoSeq, 100, SampleStateKind.ANY_SAMPLE_STATE, ViewStateKind.ANY_VIEW_STATE, InstanceStateKind.ANY_INSTANCE_STATE);
    } catch (Exception e){
      println("Cache is empty");
      return;
    }

    Iterator<SampleInfo> infoIter = infoSeq.iterator();
    out.printf("| Sensor ID | Event ID | Value |\n");
    for(int i=0; i<infoSeq.size(); i++){
      event = (Event)eventSeq.get(i);
      out.printf("| %9d | %8d | %5d |\n", event.sensor_id, event.event_id, event.value);
    }
    out.flush();
  }

  private static void writeCache(EventDataWriter eventWriter) throws IOException {
    Event event = new Event();

    print("Sensor ID: ");
    String sensor_id_str = in.readLine();
    print("Event ID: ");
    String event_id_str = in.readLine();
    print("Value: ");
    String value_str = in.readLine();

    Event sample = new Event();
    sample.sensor_id = Integer.parseInt(sensor_id_str);
    sample.event_id = Integer.parseInt(event_id_str);
    sample.value = Integer.parseInt(value_str);

    InstanceHandle_t handle = eventWriter.register_instance(sample);
//    eventWriter.write(sample, handle);
    eventWriter.write_w_timestamp(sample, handle, Time_t.now());


    out.printf("SensorID: %s, EventID: %s, Value: %s\n",sensor_id_str,event_id_str,value_str); out.flush();
  }
}

Operating with a dds library on i386, trying to pull samples repeatedly. I am explicitly 'reading' not 'takeing' the sample, so they should never expire or be removed.

  • Start two blackboard applications, (1) and (2)
  • Perform a read in both applications. This will return "Cache is empty".
  • Write from (1), sensor id: 1, event id: 1, value: 1.
  • Read from (1), confirm values
  • Read from (2), confirm values
  • Write from (2), sensor id: 1, event id: 1, value: 2.
  • Read from (2), "cache is empty"
  • Read from (1), "cache is empty"

It seems like I "broke" it! I believe the lifetime for samples should be inifinity (or so I have come to understand... but cannot confirm!) -- but I can't set it explicitly. topicQos.lifespan.duration is of the type Duration_t, but I cannot set it to a "new Duration_t(Duration_t.DURATION_INFINITY_SEC,Duration_t.DURATION_INFINITY_NSEC)" because it is already finalized?

public class Main {

  private static final String EVENT_TOPIC_NAME = "EVENTS";
  private static BufferedReader in = null;
  private static PrintStream out = null;
  /**
   * @param args the command line arguments
   */
  public static void main(String[] args) throws IOException {
    in = new BufferedReader(new InputStreamReader(System.in));
    out = new PrintStream(new BufferedOutputStream(System.out));

    DomainParticipantFactory factory = DomainParticipantFactory.TheParticipantFactory;
    DomainParticipant participant = factory.create_participant(100,
            DomainParticipantFactory.PARTICIPANT_QOS_DEFAULT,
            null,
            StatusKind.STATUS_MASK_NONE);

    EventTypeSupport.register_type(participant, EventTypeSupport.get_type_name());

    TopicQos topicQos = new TopicQos();
    topicQos.durability.direct_communication = true;
    topicQos.durability.kind = DurabilityQosPolicyKind.TRANSIENT_DURABILITY_QOS;
    topicQos.reliability.kind = ReliabilityQosPolicyKind.RELIABLE_RELIABILITY_QOS;
    topicQos.resource_limits.max_instances = 100;
    topicQos.resource_limits.max_samples = 100;
    topicQos.resource_limits.max_samples_per_instance = 1;
    topicQos.ownership.kind = OwnershipQosPolicyKind.SHARED_OWNERSHIP_QOS;
    topicQos.history.kind = HistoryQosPolicyKind.KEEP_LAST_HISTORY_QOS;
    topicQos.history.depth = 1;
    topicQos.history.refilter = RefilterQosPolicyKind.ALL_REFILTER_QOS;
    // Since this is on the same computer, and being typed by a human, we can exepct source timestamps to be useful in ordering
    topicQos.destination_order.kind = DestinationOrderQosPolicyKind.BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;

    Topic topic =
            participant.create_topic(EVENT_TOPIC_NAME,
            EventTypeSupport.get_type_name(),
            topicQos,
            new EventTopicListener(),
            StatusKind.STATUS_MASK_ALL);
    exitIfNullBecause(topic, "Could not create topic");

    Subscriber subscriber = participant.create_subscriber(DomainParticipant.SUBSCRIBER_QOS_DEFAULT,
            null,
            StatusKind.STATUS_MASK_NONE);
    exitIfNullBecause(subscriber, "Could not create subscriber");

    DataReader reader = subscriber.create_datareader(participant.lookup_topicdescription(EVENT_TOPIC_NAME),
            subscriber.DATAREADER_QOS_USE_TOPIC_QOS,
            null,
            StatusKind.STATUS_MASK_NONE);
    exitIfNullBecause(reader, "Could not create reader");
    EventDataReader eventReader = (EventDataReader) reader;

    Publisher publisher = participant.create_publisher(DomainParticipant.PUBLISHER_QOS_DEFAULT,
            null,
            StatusKind.STATUS_MASK_NONE);
    exitIfNullBecause(publisher, "Could not create publisher");

    DataWriter writer = publisher.create_datawriter(topic,
            publisher.DATAWRITER_QOS_USE_TOPIC_QOS,
            null,
            StatusKind.STATUS_MASK_NONE);
    exitIfNullBecause(writer, "Could not create writer");
    EventDataWriter eventWriter = (EventDataWriter)writer;

    boolean loop = true;
    byte inputBuffer[] = new byte[1024];
    String command;
    while(loop){
      print("Enter action [read|write|exit]: ");

      command = in.readLine();
      if(command.startsWith("r")){
        dumpCache(eventReader);
      } else if(command.startsWith("w")) {
        writeCache(eventWriter);
      } else if(command.startsWith("e")){
        println("exiting...");
        System.exit(0);
      } else {
        println("Unknown: '" + command + "'");
      }
    }

    System.exit(0);
  }

  private static void print(String output){
    out.print(output);
    out.flush();
  }
  private static void println(String output){
    out.println(output);
    out.flush();
  }

  private static void exitIfNullBecause(Object thing, String string) {
    if (thing == null) {
      println("ERROR: " + string);
      System.exit(1);
    }
  }
  private static void dumpCache(EventDataReader eventReader) {
    // Something interesting here: I can creat it with a collection as a paramter. TODO: Investigate!
    EventSeq eventSeq = new EventSeq();
    SampleInfoSeq infoSeq = new SampleInfoSeq();

    Event event = null;
    SampleInfo info = null;

    try{
      eventReader.read(eventSeq, infoSeq, 100, SampleStateKind.ANY_SAMPLE_STATE, ViewStateKind.ANY_VIEW_STATE, InstanceStateKind.ANY_INSTANCE_STATE);
    } catch (Exception e){
      println("Cache is empty");
      return;
    }

    Iterator<SampleInfo> infoIter = infoSeq.iterator();
    out.printf("| Sensor ID | Event ID | Value |\n");
    for(int i=0; i<infoSeq.size(); i++){
      event = (Event)eventSeq.get(i);
      out.printf("| %9d | %8d | %5d |\n", event.sensor_id, event.event_id, event.value);
    }
    out.flush();
  }

  private static void writeCache(EventDataWriter eventWriter) throws IOException {
    Event event = new Event();

    print("Sensor ID: ");
    String sensor_id_str = in.readLine();
    print("Event ID: ");
    String event_id_str = in.readLine();
    print("Value: ");
    String value_str = in.readLine();

    Event sample = new Event();
    sample.sensor_id = Integer.parseInt(sensor_id_str);
    sample.event_id = Integer.parseInt(event_id_str);
    sample.value = Integer.parseInt(value_str);

    InstanceHandle_t handle = eventWriter.register_instance(sample);
//    eventWriter.write(sample, handle);
    eventWriter.write_w_timestamp(sample, handle, Time_t.now());


    out.printf("SensorID: %s, EventID: %s, Value: %s\n",sensor_id_str,event_id_str,value_str); out.flush();
  }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

等待我真够勒 2024-10-08 00:49:27

该问题似乎与lifespan无关。

我不确定您使用的是哪种 DDS 实现,但根据 DDS 规范,您正在 dumpCache 方法中执行零复制操作。如果您忘记归还贷款,您使用的实现可能会表现出这样的行为。

通常,您应该在使用零拷贝进行读取/获取后return_loan

因此,请在 dumpCache 方法的末尾添加以下代码:

try{
  eventReader.return_loan(eventSeq, infoSeq);
} catch (Exception e){
  println("Error returning loan");
  return;
}

The problem does not seem to be related to lifespan.

I'm not sure which DDS implementation you are using but the according to DDS spec, you are performing a zero-copy operation in your dumpCache method. Maybe the implementation that you use behaves like this if you forget to return the loan.

You should normally return_loan after a read/take with zero-copy.

So please add the following code at the end of your dumpCachemethod:

try{
  eventReader.return_loan(eventSeq, infoSeq);
} catch (Exception e){
  println("Error returning loan");
  return;
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文