Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Unity Processing Data Stream from Socket

Background

I have a simple working server which sends binary from a data collection device, the socket connection works and both my Unity client and other clients(like matlab client) can receive the data correctly.

Then a client in Unity trying to receive the data from this server and segment the bytes from each packet into 3D coordinates array.

Each frame should be 512x424 pixels large, so that's why I need to wait until the data fills 512x424 bytes and do the segmentation

The Problem

Unity runs fine on at the socket connection and receiving stage, it will get stuck(stall) on the processing stage: ProcessFrame((byte[])state.buffer.Clone())

I have read this SO post: Getting UdpClient Receive data back in the Main thread and changed my code accordingly, but the problem persists.

Am I doing something wrong? Thanks for any help :)

Code --- client

public class SomeClass: MonoBehaviour {
public GameObject sphere;
const int IMGW = 512;
const int IMGH = 424;
const int MAXBODY = 6;
const int NUMJOINT = 25;
const int READ_BUFFER_SIZE = (4 * 3 + 4 * 1 + 4 * 1) * (IMGW * IMGH + MAXBODY * NUMJOINT);
const int PORT_NUM = 20156;
public string response = String.Empty;
private Queue queue;
private System.Object queueLock;
private int bytesRead;
// ManualResetEvent instances signal completion.
private static ManualResetEvent connectDone =
    new ManualResetEvent(false);
private static ManualResetEvent sendDone =
    new ManualResetEvent(false);
private static ManualResetEvent receiveDone =
    new ManualResetEvent(false);

// State object for receiving data from remote device.
public class StateObject
{
    // Client socket.
    public Socket workSocket = null;
    // Receive buffer.
    public const int BufferSize = (4 * 3 + 4 * 1 + 4 * 1) * (IMGW * IMGH + MAXBODY * NUMJOINT);
    public byte[] buffer = new byte[BufferSize];
}

// We use this to keep tasks needed to run in the main thread
private static readonly Queue<Action> tasks = new Queue<Action>();

// Use this for initialization
void Start () {
    queueLock = new object();
    queue = new Queue();
    this.StartClient();

  // Test object      
  sphere = GameObject.CreatePrimitive(PrimitiveType.Sphere);
  sphere.transform.position = new Vector3(0, -1, 10);     
}         
  // Update is called once per frame  
 void Update () {
    this.HandleTasks();
}

void HandleTasks()
{
    while (tasks.Count > 0)
    {
        Action task = null;

        lock (tasks)
        {
            if (tasks.Count > 0)
            {
                task = tasks.Dequeue();
            }
        }

        task();
    }
}

public void QueueOnMainThread(Action task)
{
    lock (tasks)
    {
        tasks.Enqueue(task);
    }
}

private void StartClient()
{
    try
    {
        IPAddress ipAddress = IPAddress.Parse("127.0.0.1");
        IPEndPoint remoteEP = new IPEndPoint(ipAddress, PORT_NUM);

        Socket client = new Socket(AddressFamily.InterNetwork,
            SocketType.Stream, ProtocolType.Tcp);

        // Connect to the remote endpoint.
        client.BeginConnect(remoteEP, new AsyncCallback(ConnectCallback), client);
        connectDone.WaitOne();

        Receive(client);
        receiveDone.WaitOne();

        Console.WriteLine("Response received : {0}", response);

        // Release the socket
        client.Shutdown(SocketShutdown.Both);
        client.Close();
    }
    catch (Exception e)
    {
        Debug.Log(e.ToString());
    }
}

private void ConnectCallback(IAsyncResult ar)
{
    try
    {
        // Retrieve the socket from the state object.
        Socket client = (Socket)ar.AsyncState;

        // Complete the connection.
        client.EndConnect(ar);

        // Signal that the connection has been made.
        connectDone.Set();
    }
    catch (Exception e)
    {
        String error = e.ToString();
        Console.WriteLine(e.ToString());
        fnDisconnect();
    }
}

private void Receive(Socket client)
{
    try
    {
        // Create the state object.
        StateObject state = new StateObject();
        state.workSocket = client;
        bytesRead = 0;
        // Begin receiving the data from the remote device.
        client.BeginReceive(state.buffer, bytesRead, StateObject.BufferSize, 0,
            new AsyncCallback(ReceiveCallback), state);
    }
    catch (Exception e)
    {
        Console.WriteLine(e.ToString());
    }
}

private void ReceiveCallback(IAsyncResult ar)
{
    try
    {
        // Retrieve the state object and the client socket 
        // from the asynchronous state object.
        StateObject state = (StateObject)ar.AsyncState;
        Socket client = state.workSocket;

        // Read data from the remote device.
        int numOfBytesRead = client.EndReceive(ar);

        if (numOfBytesRead > 0)
        {
            bytesRead += numOfBytesRead;
            if (bytesRead == StateObject.BufferSize)
            {
                this.QueueOnMainThread(() =>
                {
                    // All the data has arrived; put it in response.
                    ProcessFrame((byte[])state.buffer.Clone());
                });

                Receive(client);
            }
            else {
                // Get the rest of the data.
                client.BeginReceive(state.buffer, bytesRead, StateObject.BufferSize - bytesRead, 0,
                    new AsyncCallback(ReceiveCallback), state);
            }
        }
        else
        {
            receiveDone.Set();
        }

    }
    catch (Exception e)
    {
        Console.WriteLine(e.ToString());
    }
}

Code --- Data Processing

private void ProcessFrame(byte[] buffer)
{

    byte[] bufferCopy = (byte[])buffer.Clone();

    double[,,] XYZArray = new double[IMGH, IMGW, 3];
    byte[,] DepthArray = new byte[IMGH, IMGW];
    byte[,,] RGBArray = new byte[IMGH, IMGW, 3];

    for (int i = 0; i < IMGW; i++)
    {
        for (int j = 0; j < IMGH; j++)
        {
            int index = (i * IMGW + j) * 20;
            //byte[] arr = {bufferCopy[index], bufferCopy[index + 1], bufferCopy[index + 2], bufferCopy[index + 3] };
            float v = System.BitConverter.ToSingle(bufferCopy, index);
            if (!float.IsInfinity(v) && !float.IsNaN(v))
            {
                XYZArray[i, j, 0] = v;
            }
            //arr = new byte[]{bufferCopy[index + 4], bufferCopy[index + 5], bufferCopy[index + 6], bufferCopy[index + 7] };
            v = System.BitConverter.ToSingle(bufferCopy, index + 4);
            if (!float.IsInfinity(v) && !float.IsNaN(v))
            {
                XYZArray[i, j, 1] = v;
            }
            v = System.BitConverter.ToSingle(bufferCopy, index + 8);
            if (!float.IsInfinity(v) && !float.IsNaN(v))
            {
                XYZArray[i, j, 2] = v;
            }
            //                            Debug.Log("for loop called");
            DepthArray[i, j] = bufferCopy[index + 12];
            RGBArray[i, j, 2] = bufferCopy[index + 16]; // B
            RGBArray[i, j, 1] = bufferCopy[index + 17]; // G
            RGBArray[i, j, 0] = bufferCopy[index + 18]; // R
        }
    }
    this.EnQueue(XYZArray);
}

private void EnQueue(System.Object obj)
{
    lock (queueLock)
    {
        queue.Enqueue(obj);
    }
}

private bool DeQueue(System.Object outObj)
{
    bool success = false;
    lock (queueLock)
    {
        if (queue.Count > 0)
        {
            outObj = queue.Dequeue();
            success = true;
        }
    }
    return success;
}

public int lengthOfQueue()
{
    int count = -1;
    lock (queueLock)
    {
        count = queue.Count;
    }
    return count;
}

public double[,,] getXYZArray()
{
    double[,,] retVal = new double[,,] { };
    this.DeQueue(retVal);
    return retVal;
}

UPDATE

Thanks for @Programmer 's advice and I followed the link (s)he provided got a working socket client.

like image 306
dumbfingers Avatar asked Feb 11 '26 09:02

dumbfingers


1 Answers

The whole code is a mess. You should have used Thread instead of Async to accomplish that with less code.

Any ways, replace the byte[] bufferCopy = (byte[])buffer.Clone(); line of code with

byte[] bufferCopy = new byte[buffer.Length];
System.Buffer.BlockCopy(buffer, 0, bufferCopy, 0, bufferCopy.Length);

And for ProcessFrame((byte[])state.buffer.Clone());, simply pass in the data without cloning it. So that should be replaced with

ProcessFrame(state.buffer);

This should solve the problem, assuming that is the only problem in your code.

EDIT:

Here a complete TCP server code for Unity. Port the code to UDP and that should work for you.

like image 88
Programmer Avatar answered Feb 14 '26 00:02

Programmer



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!