使用 Flex 套接字读取 AMF 对象

发布于 2024-11-08 05:11:42 字数 7347 浏览 2 评论 0原文

我目前正在尝试使用套接字和 AMF 序列化对象在 java 和 flex 之间进行通信。

在java方面,我使用BlazeDS的Amf3Input和Amf3Output(flex-messaging-common.jar和flex-messaging-core.jar)。

连接已正确建立,如果我尝试将对象从flex发送到java,我可以轻松读取对象:

FLEX端:

protected function button2_clickHandler(event:MouseEvent):void
{
    var tmp:FlexAck = new FlexAck;
    tmp.id="123456789123456789123456789";
    tmp.name="A";
    tmp.source="Aaaaaa";
    tmp.ackGroup=false;
    s.writeObject(tmp);
    s.flush();
}

JAVA端:

ServerSocket servSoc = new ServerSocket(8888);
Socket s = servSoc.accept();
Amf3Output amf3Output = new Amf3Output(SerializationContext.getSerializationContext());
amf3Output.setOutputStream(s.getOutputStream());
Amf3Input amf3Input = new Amf3Input(SerializationContext.getSerializationContext());
amf3Input.setInputStream(s.getInputStream());
while(true)
{
    try
    {
      Object obj = amf3Input.readObject();
      if(obj!=null){
          if (obj instanceof AckOrder){
          System.out.println(((AckOrder)obj).getId());
      }
      }                 
}
catch (Exception e)
{
      e.printStackTrace();
  break;
}
  }
  amf3Output.close();
  amf3Input.close();
  servSoc.close();

这样它可以完美工作,但问题是读取从java端发送的对象。

我在java中使用的代码是:

for(int i=0;i<10;i++){
    ack = new AckOrder(i,"A","B", true);
    amf3Output.writeObject(ack);
    amf3Output.writeObjectEnd();
    amf3Output.flush();
}

我在ProgressEvent.SOCKET_DATA上有一个处理程序:

trace((s.readObject() as FlexAck).id);

但我有错误,例如: 错误#2030:检测到文件结尾 错误#2006:索引越界

如果我在 ByteArray 上添加操作,我会设法读取第一个对象,但不能读取后面的对象。

s.readBytes(tmp,tmp.length);
content = clone(tmp);
(content.readObject());
trace("########################## OK OBJECT RECEIVED");
var ack:FlexAck = (tmp.readObject() as FlexAck); 
trace("**********************> id = "+ack.id);

我花了很多时间试图在几个论坛等中找到一些东西,但没有任何帮助。

所以如果有人能帮助我那就太好了。

谢谢

Sylvain

编辑:

这是一个我认为应该可行的例子,但我不希望它能更好地说明我的目标(与套接字的永久连接和消息交换)。

Java 类:

import java.net.ServerSocket;
import java.net.Socket;
import awl.oscare.protocol.AckOrder;
import flex.messaging.io.SerializationContext;
import flex.messaging.io.amf.Amf3Input;
import flex.messaging.io.amf.Amf3Output;


public class Main {
public static void main(String[] args) {
        while(true)
        {
        try {
        ServerSocket servSoc = new ServerSocket(8888);
        Socket s = servSoc.accept();
        System.out.println("connection accepted");
        Amf3Output amf3Output = new Amf3Output(SerializationContext.getSerializationContext());
        amf3Output.setOutputStream(s.getOutputStream());
        Amf3Input amf3Input = new Amf3Input(SerializationContext.getSerializationContext());
        amf3Input.setInputStream(s.getInputStream());
        while(true)
        {
            try
            {
                System.out.println("Reading object");
                Object obj = amf3Input.readObject();
                if(obj!=null)
                {
                    System.out.println(obj.getClass());
                    if (obj instanceof AckOrder)
                    {
                        AckOrder order = new AckOrder();
                          order.setId(((AckOrder)obj).getId());
order.setName(((AckOrder)obj).getName());
                          order.setSource(((AckOrder)obj).getSource());
                        order.setAckGroup(((AckOrder)obj).isAckGroup());
                          System.out.println(((AckOrder)obj).getId());
                        amf3Output.writeObject(order);
                        amf3Output.writeObjectEnd();
                        amf3Output.flush();
                    }
                }
            }
            catch (Exception e)
            {
                e.printStackTrace();
                break;
            }
        }
        amf3Output.close();
        amf3Input.close();
        servSoc.close();
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    }

}
}

Java 可序列化对象:

package protocol;

import java.io.Serializable;

public class AckOrder implements Serializable {
  private static final long serialVersionUID = 5106528318894546695L;
  private String id;
private String name;
private String source;
private boolean ackGroup = false;

public String getId() {
    return this.id;
}

public void setId(String id) {
    this.id = id;
}

public String getName() {
    return this.name;
}

public void setName(String name) {
    this.name = name;
}

public void setSource(String source) {
    this.source = source;
}

public String getSource() {
    return this.source;
}

public void setAckGroup(boolean ackGroup) {
    this.ackGroup = ackGroup;
}

public boolean isAckGroup() {
    return this.ackGroup;
}

public AckOrder()
{
    super();
}
}

Flex 端:

主要 Flex 代码:

<fx:Script>
    <![CDATA[
        import mx.collections.ArrayCollection;
        import mx.controls.Alert;
        import mx.events.FlexEvent;
        import mx.utils.object_proxy;


        private var _socket:Socket = new Socket();;

        private function onCreationComplete():void
        {
            this._socket.connect("localhost",8888);
            this._socket.addEventListener(ProgressEvent.SOCKET_DATA, onData);
        }

        private function onData(e:ProgressEvent):void
        {

            if(this._socket.bytesAvailable)
            {
                this._socket.endian = Endian.LITTLE_ENDIAN;
                var objects:Array = [];
                try{
                    while(this._socket.bytesAvailable > 0)
                    {
                        objects.push(this._socket.readObject());
                    }
                }catch(e:Error){trace(e.message);}
                    trace("|"+(objects)+"|");
            }

        }

        protected function sendButton_clickHandler(event:MouseEvent):void
        {
            var tmp:FlexAck = new FlexAck;
            tmp.id="1";
            tmp.name="A";
            tmp.source="B";
            tmp.ackGroup=false;
            this._socket.writeObject(tmp);
            this._socket.flush();
        }


    ]]>
</fx:Script>
<s:Button x="0" y="0" name="send" label="Send" click="sendButton_clickHandler(event)"/>

Flex 可序列化对象:

package
{

[Bindable]
[RemoteClass(alias="protocol.AckOrder")] 
public class FlexAck
{
    public function FlexAck()
    {
    }

    public var id:String;
    public var name:String;
    public var source:String;
    public var ackGroup:Boolean;

}
}

编辑 2011 年 5 月 25 日:

我已在我的 Flex 代码中添加了这些侦听器:

this._socket.addEventListener(Event.ACTIVATE,onActivate);
                this._socket.addEventListener(Event.CLOSE,onClose);
                this._socket.addEventListener(Event.CONNECT,onConnect);
                this._socket.addEventListener(Event.DEACTIVATE,onDeactivate);
                this._socket.addEventListener(IOErrorEvent.IO_ERROR,onIOerror);
            this._socket.addEventListener(SecurityErrorEvent.SECURITY_ERROR,onSecurityError);

但是没有错误,我仍然无法正确接收对象。

I'm currently trying to communicate between java and flex by using sockets and AMF serialized objects.

On the java side I use Amf3Input and Amf3Output from BlazeDS (flex-messaging-common.jar and flex-messaging-core.jar).

The connection is correctly established, and if i try to send object from flex to java, i can easily read objects :

FLEX side :

protected function button2_clickHandler(event:MouseEvent):void
{
    var tmp:FlexAck = new FlexAck;
    tmp.id="123456789123456789123456789";
    tmp.name="A";
    tmp.source="Aaaaaa";
    tmp.ackGroup=false;
    s.writeObject(tmp);
    s.flush();
}

JAVA side :

ServerSocket servSoc = new ServerSocket(8888);
Socket s = servSoc.accept();
Amf3Output amf3Output = new Amf3Output(SerializationContext.getSerializationContext());
amf3Output.setOutputStream(s.getOutputStream());
Amf3Input amf3Input = new Amf3Input(SerializationContext.getSerializationContext());
amf3Input.setInputStream(s.getInputStream());
while(true)
{
    try
    {
      Object obj = amf3Input.readObject();
      if(obj!=null){
          if (obj instanceof AckOrder){
          System.out.println(((AckOrder)obj).getId());
      }
      }                 
}
catch (Exception e)
{
      e.printStackTrace();
  break;
}
  }
  amf3Output.close();
  amf3Input.close();
  servSoc.close();

In this way it works perfectly, but the problem is to read objects sent from the java side.

The code I use in java is :

for(int i=0;i<10;i++){
    ack = new AckOrder(i,"A","B", true);
    amf3Output.writeObject(ack);
    amf3Output.writeObjectEnd();
    amf3Output.flush();
}

I have an handler on ProgressEvent.SOCKET_DATA :

trace((s.readObject() as FlexAck).id);

But I have errors such as :
Error #2030: End of File detected
Error #2006: Index Out of bound

If i add manipulations on ByteArrays, i manage to read the first object, but not the following.

s.readBytes(tmp,tmp.length);
content = clone(tmp);
(content.readObject());
trace("########################## OK OBJECT RECEIVED");
var ack:FlexAck = (tmp.readObject() as FlexAck); 
trace("**********************> id = "+ack.id);

I've spent many our trying to find something in several forums etc, but nothing helped.

So if someone could help me it would be great.

Thanks

Sylvain

EDIT :

Here is an example that I thought should work, but doesn't I hope that it's better illustrate what I aim to do (permanent connection with socket and an exchange of messages).

Java class :

import java.net.ServerSocket;
import java.net.Socket;
import awl.oscare.protocol.AckOrder;
import flex.messaging.io.SerializationContext;
import flex.messaging.io.amf.Amf3Input;
import flex.messaging.io.amf.Amf3Output;


public class Main {
public static void main(String[] args) {
        while(true)
        {
        try {
        ServerSocket servSoc = new ServerSocket(8888);
        Socket s = servSoc.accept();
        System.out.println("connection accepted");
        Amf3Output amf3Output = new Amf3Output(SerializationContext.getSerializationContext());
        amf3Output.setOutputStream(s.getOutputStream());
        Amf3Input amf3Input = new Amf3Input(SerializationContext.getSerializationContext());
        amf3Input.setInputStream(s.getInputStream());
        while(true)
        {
            try
            {
                System.out.println("Reading object");
                Object obj = amf3Input.readObject();
                if(obj!=null)
                {
                    System.out.println(obj.getClass());
                    if (obj instanceof AckOrder)
                    {
                        AckOrder order = new AckOrder();
                          order.setId(((AckOrder)obj).getId());
order.setName(((AckOrder)obj).getName());
                          order.setSource(((AckOrder)obj).getSource());
                        order.setAckGroup(((AckOrder)obj).isAckGroup());
                          System.out.println(((AckOrder)obj).getId());
                        amf3Output.writeObject(order);
                        amf3Output.writeObjectEnd();
                        amf3Output.flush();
                    }
                }
            }
            catch (Exception e)
            {
                e.printStackTrace();
                break;
            }
        }
        amf3Output.close();
        amf3Input.close();
        servSoc.close();
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    }

}
}

Java Serializable object :

package protocol;

import java.io.Serializable;

public class AckOrder implements Serializable {
  private static final long serialVersionUID = 5106528318894546695L;
  private String id;
private String name;
private String source;
private boolean ackGroup = false;

public String getId() {
    return this.id;
}

public void setId(String id) {
    this.id = id;
}

public String getName() {
    return this.name;
}

public void setName(String name) {
    this.name = name;
}

public void setSource(String source) {
    this.source = source;
}

public String getSource() {
    return this.source;
}

public void setAckGroup(boolean ackGroup) {
    this.ackGroup = ackGroup;
}

public boolean isAckGroup() {
    return this.ackGroup;
}

public AckOrder()
{
    super();
}
}

Flex Side :

Main flex code :

<fx:Script>
    <![CDATA[
        import mx.collections.ArrayCollection;
        import mx.controls.Alert;
        import mx.events.FlexEvent;
        import mx.utils.object_proxy;


        private var _socket:Socket = new Socket();;

        private function onCreationComplete():void
        {
            this._socket.connect("localhost",8888);
            this._socket.addEventListener(ProgressEvent.SOCKET_DATA, onData);
        }

        private function onData(e:ProgressEvent):void
        {

            if(this._socket.bytesAvailable)
            {
                this._socket.endian = Endian.LITTLE_ENDIAN;
                var objects:Array = [];
                try{
                    while(this._socket.bytesAvailable > 0)
                    {
                        objects.push(this._socket.readObject());
                    }
                }catch(e:Error){trace(e.message);}
                    trace("|"+(objects)+"|");
            }

        }

        protected function sendButton_clickHandler(event:MouseEvent):void
        {
            var tmp:FlexAck = new FlexAck;
            tmp.id="1";
            tmp.name="A";
            tmp.source="B";
            tmp.ackGroup=false;
            this._socket.writeObject(tmp);
            this._socket.flush();
        }


    ]]>
</fx:Script>
<s:Button x="0" y="0" name="send" label="Send" click="sendButton_clickHandler(event)"/>

Flex serializable object :

package
{

[Bindable]
[RemoteClass(alias="protocol.AckOrder")] 
public class FlexAck
{
    public function FlexAck()
    {
    }

    public var id:String;
    public var name:String;
    public var source:String;
    public var ackGroup:Boolean;

}
}

Edit 25/05/2011 :

I've added those listeners in my flex code :

this._socket.addEventListener(Event.ACTIVATE,onActivate);
                this._socket.addEventListener(Event.CLOSE,onClose);
                this._socket.addEventListener(Event.CONNECT,onConnect);
                this._socket.addEventListener(Event.DEACTIVATE,onDeactivate);
                this._socket.addEventListener(IOErrorEvent.IO_ERROR,onIOerror);
            this._socket.addEventListener(SecurityErrorEvent.SECURITY_ERROR,onSecurityError);

But There's no errors and I still don't manage to receive objects correctly.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

贩梦商人 2024-11-15 05:11:42

您必须将 AMF 数据作为 ByteArray 发送到服务器上:

final ByteArrayOutputStream baos = new ByteArrayOutputStream();
amf3Output.setOutputStream(baos);
amf3Output.writeObject(order);
amf3Output.flush();
amf3Output.close();
s.getOutputStream().write(baos.toByteArray());

然后

this._socket.readObject()

按预期工作!

You have to send the AMF data as ByteArray on the server:

final ByteArrayOutputStream baos = new ByteArrayOutputStream();
amf3Output.setOutputStream(baos);
amf3Output.writeObject(order);
amf3Output.flush();
amf3Output.close();
s.getOutputStream().write(baos.toByteArray());

Then

this._socket.readObject()

works as expected !

℉絮湮 2024-11-15 05:11:42

您好,问题是由以下原因引起的:

  1. AMF 流是有状态的。当它序列化对象时,它会相对于已写入的对象来压缩它们。

  2. 压缩是通过使用索引引用先前发送的类描述、字符串值和对象来实现的(例如,如果您发送的第一个字符串是“heloWorld”,当您稍后发送该字符串时,AMF 流将发送字符串索引 0 )。

  3. 不幸的是,ByteArray 和 Socket 不维护 readObject 调用之间的引用表。因此,即使您不断将新读取的对象附加到同一个 ByteArray 对象的末尾,每次调用 readObject 都会实例化新的引用表,丢弃以前创建的引用表(这意味着它应该适用于对象树中对同一字符串的重复引用)

  4. 在您的示例中,您始终将相同的字符串值写入属性。因此,当您发送第二个对象时,它的字符串属性不会序列化为字符串,而是作为对先前编写的对象中的字符串的引用。

解决方案是为您发送的每个对象创建一个新的 AMF 流。

当然,这完全是垃圾(!)这意味着我们无法真正利用自定义协议中的压缩。如果我们的协议能够决定何时重置这些参考表(也许是当它们变得太大时),那就更好了。

例如,如果您有 RPC 协议,那么最好让 AMF 流将远程方法名称作为引用而不是字符串传递以提高速度...

我没有检查过,但我认为这种事情是由 RTMP 完成的。它可能无法在 ByteArray 和 Socket 等开发人员对象中使用的原因(唉,我希望这不是真的)是因为 Adob​​e 希望推动我们走向 LCDS...

附录/编辑:刚刚发现了这个,其中提供了解决方案 http://code.google.com/p/cvlib/

Hi the problem is caused by the following:

  1. An AMF stream is stateful. When it serializes objects, it compresses them relative to objects that it have already been written.

  2. Compression is achieved by referencing previously sent class descriptions, string values and objects using indexes (so for example, if the first string you sent was "heloWorld", when you later send that string, the AMF stream will sent string index 0).

  3. Unfortunately, ByteArray and Socket do not maintain reference tables between readObject calls. Thus, even if you keep appending your newly read objects to the end of the same ByteArray object, each call to readObject instantiates new reference tables, discarding previously created ones (this means it should work for repeated references to the same string within an object tree)

  4. In your example, you are always writing the same string values to properties. Thus when you send the second object, its string properties are not serialized as strings, but as references to the strings in the previously written object.

The solution, is to create a new AMF stream for each object you send.

This is complete rubbish of course(!) It means we can't really utilize the compression in custom protocols. It would be much better if our protocols could decide when to reset the these reference tables, perhaps when they got too big.

For example, if you have an RPC protocol, it would be nice to have an AMF stream pass the remote method names as references rather than strings for speed...

I haven't checked but I think this sort of thing is done by RTMP. The reason it probably wouldn't have been made available in developer objects like ByteArray and Socket (sigh, I hope this isn't true) is because Adobe wants to push us towards LCDS...

Addendum/edit: just found this, which provides a solution http://code.google.com/p/cvlib/

只想待在家 2024-11-15 05:11:42

查看代码,我认为您在 Java 端想要做的是:

for(int i=0;i<10;i++){
    ack = new AckOrder(i,"A","B", true);
    amf3Output.writeObject(ack);
}
amf3Output.flush();

当您执行“flush”时,您将通过套接字发送信息,因此您只发送了一个对象一次。在 Flex 端,您应该始终尝试查看对象的长度,并确保您不会越过它,否则会导致此错误。

编辑:

private var _socket:Socket = new Socket();

private function onCreationComplete():void
{
    // Add connection socket info here
    this._socket.addEventListener(ProgressEvent.SOCKET_DATA, onData);
}

// This gets called every time we get new info, as in after the server flushes
private function onData(e:ProgressEvent):void
{
    if(this._socket.bytesAvailable)
    {
        this._socket.endian = Endian.LITTLE_ENDIAN; // Might not be needed, but often is
        // Try to get objects
        var objects:Array = [];
        try{
            while(this._socket.bytesAvailable > 0)
            {
                objects.push(this._socket.readObject());
            }
        }catch(e:Error){}
        // Do something with objects array
    }
}

onData 函数被连续调用(每次服务器发送信息时),因为一切都是异步的。

After looking at the code, I think what you want to do on the Java end is this:

for(int i=0;i<10;i++){
    ack = new AckOrder(i,"A","B", true);
    amf3Output.writeObject(ack);
}
amf3Output.flush();

When you do 'flush', you're sending information over the socket so you only had one object being sent at a time. On the Flex end, you should always try to see what's the length of the object and make sure you're not going over it which would cause this error.

EDIT:

private var _socket:Socket = new Socket();

private function onCreationComplete():void
{
    // Add connection socket info here
    this._socket.addEventListener(ProgressEvent.SOCKET_DATA, onData);
}

// This gets called every time we get new info, as in after the server flushes
private function onData(e:ProgressEvent):void
{
    if(this._socket.bytesAvailable)
    {
        this._socket.endian = Endian.LITTLE_ENDIAN; // Might not be needed, but often is
        // Try to get objects
        var objects:Array = [];
        try{
            while(this._socket.bytesAvailable > 0)
            {
                objects.push(this._socket.readObject());
            }
        }catch(e:Error){}
        // Do something with objects array
    }
}

The onData function is called continually (every time the server sends info) since everything is asynchronous.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文