Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Read an AMF object with flex socket

I'm currently trying to communicate between java and flex by using sockets and AMF serialized objects.

On the java side I use Amf3Input and Amf3Output from BlazeDS (flex-messaging-common.jar and flex-messaging-core.jar).

The connection is correctly established, and if i try to send object from flex to java, i can easily read objects :

FLEX side :

protected function button2_clickHandler(event:MouseEvent):void
{
    var tmp:FlexAck = new FlexAck;
    tmp.id="123456789123456789123456789";
    tmp.name="A";
    tmp.source="Aaaaaa";
    tmp.ackGroup=false;
    s.writeObject(tmp);
    s.flush();
}

JAVA side :

ServerSocket servSoc = new ServerSocket(8888);
Socket s = servSoc.accept();
Amf3Output amf3Output = new Amf3Output(SerializationContext.getSerializationContext());
amf3Output.setOutputStream(s.getOutputStream());
Amf3Input amf3Input = new Amf3Input(SerializationContext.getSerializationContext());
amf3Input.setInputStream(s.getInputStream());
while(true)
{
    try
    {
      Object obj = amf3Input.readObject();
      if(obj!=null){
          if (obj instanceof AckOrder){
          System.out.println(((AckOrder)obj).getId());
      }
      }                 
}
catch (Exception e)
{
      e.printStackTrace();
  break;
}
  }
  amf3Output.close();
  amf3Input.close();
  servSoc.close();

In this way it works perfectly, but the problem is to read objects sent from the java side.

The code I use in java is :

for(int i=0;i<10;i++){
    ack = new AckOrder(i,"A","B", true);
    amf3Output.writeObject(ack);
    amf3Output.writeObjectEnd();
    amf3Output.flush();
}

I have an handler on ProgressEvent.SOCKET_DATA :

trace((s.readObject() as FlexAck).id);

But I have errors such as : Error #2030: End of File detected Error #2006: Index Out of bound

If i add manipulations on ByteArrays, i manage to read the first object, but not the following.

s.readBytes(tmp,tmp.length);
content = clone(tmp);
(content.readObject());
trace("########################## OK OBJECT RECEIVED");
var ack:FlexAck = (tmp.readObject() as FlexAck); 
trace("**********************> id = "+ack.id);

I've spent many our trying to find something in several forums etc, but nothing helped.

So if someone could help me it would be great.

Thanks

Sylvain

EDIT :

Here is an example that I thought should work, but doesn't I hope that it's better illustrate what I aim to do (permanent connection with socket and an exchange of messages).

Java class :

import java.net.ServerSocket;
import java.net.Socket;
import awl.oscare.protocol.AckOrder;
import flex.messaging.io.SerializationContext;
import flex.messaging.io.amf.Amf3Input;
import flex.messaging.io.amf.Amf3Output;


public class Main {
public static void main(String[] args) {
        while(true)
        {
        try {
        ServerSocket servSoc = new ServerSocket(8888);
        Socket s = servSoc.accept();
        System.out.println("connection accepted");
        Amf3Output amf3Output = new Amf3Output(SerializationContext.getSerializationContext());
        amf3Output.setOutputStream(s.getOutputStream());
        Amf3Input amf3Input = new Amf3Input(SerializationContext.getSerializationContext());
        amf3Input.setInputStream(s.getInputStream());
        while(true)
        {
            try
            {
                System.out.println("Reading object");
                Object obj = amf3Input.readObject();
                if(obj!=null)
                {
                    System.out.println(obj.getClass());
                    if (obj instanceof AckOrder)
                    {
                        AckOrder order = new AckOrder();
                          order.setId(((AckOrder)obj).getId());
order.setName(((AckOrder)obj).getName());
                          order.setSource(((AckOrder)obj).getSource());
                        order.setAckGroup(((AckOrder)obj).isAckGroup());
                          System.out.println(((AckOrder)obj).getId());
                        amf3Output.writeObject(order);
                        amf3Output.writeObjectEnd();
                        amf3Output.flush();
                    }
                }
            }
            catch (Exception e)
            {
                e.printStackTrace();
                break;
            }
        }
        amf3Output.close();
        amf3Input.close();
        servSoc.close();
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    }

}
}

Java Serializable object :

package protocol;

import java.io.Serializable;

public class AckOrder implements Serializable {
  private static final long serialVersionUID = 5106528318894546695L;
  private String id;
private String name;
private String source;
private boolean ackGroup = false;

public String getId() {
    return this.id;
}

public void setId(String id) {
    this.id = id;
}

public String getName() {
    return this.name;
}

public void setName(String name) {
    this.name = name;
}

public void setSource(String source) {
    this.source = source;
}

public String getSource() {
    return this.source;
}

public void setAckGroup(boolean ackGroup) {
    this.ackGroup = ackGroup;
}

public boolean isAckGroup() {
    return this.ackGroup;
}

public AckOrder()
{
    super();
}
}

Flex Side :

Main flex code :

<fx:Script>
    <![CDATA[
        import mx.collections.ArrayCollection;
        import mx.controls.Alert;
        import mx.events.FlexEvent;
        import mx.utils.object_proxy;


        private var _socket:Socket = new Socket();;

        private function onCreationComplete():void
        {
            this._socket.connect("localhost",8888);
            this._socket.addEventListener(ProgressEvent.SOCKET_DATA, onData);
        }

        private function onData(e:ProgressEvent):void
        {

            if(this._socket.bytesAvailable)
            {
                this._socket.endian = Endian.LITTLE_ENDIAN;
                var objects:Array = [];
                try{
                    while(this._socket.bytesAvailable > 0)
                    {
                        objects.push(this._socket.readObject());
                    }
                }catch(e:Error){trace(e.message);}
                    trace("|"+(objects)+"|");
            }

        }

        protected function sendButton_clickHandler(event:MouseEvent):void
        {
            var tmp:FlexAck = new FlexAck;
            tmp.id="1";
            tmp.name="A";
            tmp.source="B";
            tmp.ackGroup=false;
            this._socket.writeObject(tmp);
            this._socket.flush();
        }


    ]]>
</fx:Script>
<s:Button x="0" y="0" name="send" label="Send" click="sendButton_clickHandler(event)"/>

Flex serializable object :

package
{

[Bindable]
[RemoteClass(alias="protocol.AckOrder")] 
public class FlexAck
{
    public function FlexAck()
    {
    }

    public var id:String;
    public var name:String;
    public var source:String;
    public var ackGroup:Boolean;

}
}

Edit 25/05/2011 :

I've added those listeners in my flex code :

this._socket.addEventListener(Event.ACTIVATE,onActivate);
                this._socket.addEventListener(Event.CLOSE,onClose);
                this._socket.addEventListener(Event.CONNECT,onConnect);
                this._socket.addEventListener(Event.DEACTIVATE,onDeactivate);
                this._socket.addEventListener(IOErrorEvent.IO_ERROR,onIOerror);
            this._socket.addEventListener(SecurityErrorEvent.SECURITY_ERROR,onSecurityError);

But There's no errors and I still don't manage to receive objects correctly.

like image 506
Sylvain Avatar asked Nov 04 '22 21:11

Sylvain


2 Answers

You have to send the AMF data as ByteArray on the server:

final ByteArrayOutputStream baos = new ByteArrayOutputStream();
amf3Output.setOutputStream(baos);
amf3Output.writeObject(order);
amf3Output.flush();
amf3Output.close();
s.getOutputStream().write(baos.toByteArray());

Then

this._socket.readObject()

works as expected !

like image 134
n0ize Avatar answered Nov 07 '22 14:11

n0ize


Hi the problem is caused by the following:

  1. An AMF stream is stateful. When it serializes objects, it compresses them relative to objects that it have already been written.

  2. Compression is achieved by referencing previously sent class descriptions, string values and objects using indexes (so for example, if the first string you sent was "heloWorld", when you later send that string, the AMF stream will sent string index 0).

  3. Unfortunately, ByteArray and Socket do not maintain reference tables between readObject calls. Thus, even if you keep appending your newly read objects to the end of the same ByteArray object, each call to readObject instantiates new reference tables, discarding previously created ones (this means it should work for repeated references to the same string within an object tree)

  4. In your example, you are always writing the same string values to properties. Thus when you send the second object, its string properties are not serialized as strings, but as references to the strings in the previously written object.

The solution, is to create a new AMF stream for each object you send.

This is complete rubbish of course(!) It means we can't really utilize the compression in custom protocols. It would be much better if our protocols could decide when to reset the these reference tables, perhaps when they got too big.

For example, if you have an RPC protocol, it would be nice to have an AMF stream pass the remote method names as references rather than strings for speed...

I haven't checked but I think this sort of thing is done by RTMP. The reason it probably wouldn't have been made available in developer objects like ByteArray and Socket (sigh, I hope this isn't true) is because Adobe wants to push us towards LCDS...

Addendum/edit: just found this, which provides a solution http://code.google.com/p/cvlib/

like image 29
dwilliams Avatar answered Nov 07 '22 13:11

dwilliams