Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Data corruption Piping between C++ and Python

I am writing some code that takes binary data from Python, Pipes it to C++, does some processing on the data, (in this case calculating a mutual information metric) and then pipes the results back to python. While testing I have found that everything works fine if the data I send is a set of 2 arrays with dimensions less than 1500 X 1500, but if I send 2 arrays that are 2K X 2K I get back a lot of corrupted nonsense.

I currently believe the algorithmic portion of the code is fine because it provides the expected answers during testing with small (<=1500 X1500) arrays. That leads me to believe that this is an issue with either the stdin or stdout piping. That maybe I’m passing some intrinsic limit somewhere.

The Python Code and C++ code are below.

Python Code:

import subprocess
import struct
import sys
import numpy as np

#set up the variables needed 
bytesPerDouble = 8
sizeX = 2000
sizeY = 2000
offset = sizeX*sizeY
totalBytesPerArray = sizeX*sizeY*bytesPerDouble
totalBytes = totalBytesPerArray*2                   #the 2 is because we pass 2 different versions of the 2D array

#setup the testing data array 
a = np.zeros(sizeX*sizeY*2, dtype='d')
for i in range(sizeX):
    for j in range(sizeY):
        a[j+i*sizeY] = i
        a[j+i*sizeY+offset] = i
        if i % 10 == 0:
            a[j+i*sizeY+offset] = j

data = a.tobytes('C')      

strTotalBytes = str(totalBytes)
strLineBytes  = str(sizeY*bytesPerDouble)

#communicate with c++ code
print("starting C++ code")     
command =   "C:\Python27\PythonPipes.exe"
proc = subprocess.Popen([command, strTotalBytes, strLineBytes, str(sizeY), str(sizeX)], stdin=subprocess.PIPE,stderr=subprocess.PIPE,stdout=subprocess.PIPE)

ByteBuffer = (data)
proc.stdin.write(ByteBuffer)

print("Reading results back from C++")
for i in range(sizeX):
    returnvalues = proc.stdout.read(sizeY*bytesPerDouble)
    a = buffer(returnvalues)
    b = struct.unpack_from(str(sizeY)+'d', a)
    print str(b) + " " + str(i)

print('done')

C++ Code: Main function:

int main(int argc, char **argv) {
    int count = 0;
    long totalbytes = stoi(argv[argc-4], nullptr,10);       //bytes being transfered
    long bytechunk = stoi(argv[argc - 3], nullptr, 10); //bytes being transfered at a time
    long height = stoi(argv[argc-2], nullptr, 10);  //bytes being transfered at a time
    long width  = stoi(argv[argc-1], nullptr, 10);  //bytes being transfered at a time
    long offset = totalbytes / sizeof(double) / 2;


    data = new double[totalbytes/sizeof(double)];
    int columnindex = 0;
    //read in data from pipe
    while (count<totalbytes) {

        fread(&(data[columnindex]), 1, bytechunk, stdin);
        columnindex += bytechunk / sizeof(double);
        count += bytechunk;

    }


    //calculate the data transform
    MutualInformation MI = MutualInformation();
    MI.Initialize(data, height, width, offset);
    MI.calcMI();
    count = 0;
    //*
    //write out data to pipe
    columnindex = 0;
    while (count<totalbytes/2) {

        fwrite(&(MI.getOutput()[columnindex]), 1, bytechunk, stdout);
        fflush(stdout);
        count += bytechunk;
        columnindex += bytechunk/sizeof(double);
    }
    //*/
    delete [] data;

    return 0;
}

and in case you need it the actual processing code:

double MutualInformation::calcMI(){
    double rvalue = 0.0;
    std::map<int, map<int, double>> lHistXY = map<int, map<int, double>>();
    std::map<int, double> lHistX = map<int, double>();
    std::map<int, double> lHistY = map<int, double>();
    typedef std::map<int, std::map<int, double>>::iterator HistXY_iter;
    typedef std::map<int, double>::iterator HistY_iter;

    //calculate Entropys and MI
    double MI = 0.0;
    double Hx = 0.0;
    double Hy = 0.0;
    double Px = 0.0;
    double Py = 0.0;
    double Pxy = 0.0;

    //scan through the image
    int ip = 0;
    int jp = 0;
    int chipsize = 3;

    //setup zero array
    double * zeros = new double[this->mHeight];
    for (int j = 0; j < this->mHeight; j++){
        zeros[j] = 0.0;
    }

    //zero out Output array
    for (int i = 0; i < this->mWidth; i++){
        memcpy(&(this->mOutput[i*this->mHeight]), zeros, this->mHeight*8);
    }


    double index = 0.0;
    for (int ioutter = chipsize; ioutter < (this->mWidth - chipsize); ioutter++){
        //write out processing status
        //index = (double)ioutter;
        //fwrite(&index, 8, 1, stdout);
        //fflush(stdout);
        //*
        for (int j = chipsize; j < (this->mHeight - chipsize); j++){

            //clear the histograms
            lHistX.clear();
            lHistY.clear();
            lHistXY.clear();
            //chip out a section of the image
            for (int k = -chipsize; k <= chipsize; k++){
                for (int l = -chipsize; l <= chipsize; l++){
                    ip = ioutter + k;
                    jp = j + l;
                    //update X histogram
                    if (lHistX.count(int(this->mData[ip*this->mHeight + jp]))){
                        lHistX[int(this->mData[ip*this->mHeight + jp])] += 1.0;
                    }else{
                        lHistX[int(this->mData[ip*this->mHeight + jp])] = 1.0;

                    }
                    //update Y histogram
                    if (lHistY.count(int(this->mData[ip*this->mHeight + jp+this->mOffset]))){
                        lHistY[int(this->mData[ip*this->mHeight + jp+this->mOffset])] += 1.0;
                    }
                    else{
                        lHistY[int(this->mData[ip*this->mHeight + jp+this->mOffset])] = 1.0;

                    }

                    //update X and Y Histogram
                    if (lHistXY.count(int(this->mData[ip*this->mHeight + jp]))){ 
                        //X Key exists check if Y key exists
                        if (lHistXY[int(this->mData[ip*this->mHeight + jp])].count(int(this->mData[ip*this->mHeight + jp + this->mOffset]))){
                            //X & Y keys exist
                            lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] += 1;
                        }else{
                            //X exist but Y doesn't
                            lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] = 1;
                        }
                    }else{
                        //X Key Didn't exist
                        lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] = 1;
                    };
                }
            }

            //calculate PMI, Hx, Hy
            // iterator->first = key
            // iterator->second = value

             MI = 0.0;
             Hx = 0.0;
             Hy = 0.0;

            for (HistXY_iter Hist2D_iter = lHistXY.begin(); Hist2D_iter != lHistXY.end(); Hist2D_iter++) {

                Px = lHistX[Hist2D_iter->first] / ((double) this->mOffset);
                Hx -= Px*log(Px);

                for (HistY_iter HistY_iter = Hist2D_iter->second.begin(); HistY_iter != Hist2D_iter->second.end(); HistY_iter++) {
                    Py = lHistY[HistY_iter->first] / ((double) this->mOffset);
                    Hy -= Py*log(Py);
                    Pxy = HistY_iter->second / ((double) this->mOffset);
                    MI += Pxy*log(Pxy / Py / Px);
                }
            }

            //normalize PMI to max(Hx,Hy) so that the PMI value runs from 0 to 1
            if (Hx >= Hy && Hx > 0.0){
                MI /= Hx;
            }else if(Hy > Hx && Hy > 0.0){
                MI /= Hy;
            }
            else{
                MI = 0.0;
            }

            //write PMI to data output array
            if (MI < 1.1){
                this->mOutput[ioutter*this->mHeight + j] = MI;
            }
            else{
                this->mOutput[ioutter*this->mHeight + j] = 0.0;

            }

        }



    }

    return rvalue;
}

with arrays that return something that makes sense I get output bounded between 0 and 1 like this:

(0.0, 0.0, 0.0, 0.7160627908692593, 0.6376472316395495, 0.5728801401524277,...

with the 2Kx2K or higher arrays I get nonesense like this (even though the code clamps the values between 0 and 1):

(-2.2491400820412374e+228, -2.2491400820412374e+228, -2.2491400820412374e+228, -2.2491400820412374e+228, -2.2491400820412374e+228,...

I would like to know why this code is corrupting the data set after it is assigned between 0.0 and 1, and whether or not it is a piping issue, a stdin/stdout issue, a buffer issue of some sort, or a coding issue I am simply not seeing.

Update I tried passing the data in smaller chunks using the code that Chris suggested with no luck. also of note is that I added a catch for ferror on stdout and it never got tripped so I am pretty sure that the bytes are at least making it to stdout. Is it possible that something else is writing to stdout somehow? maybe an extra byte making its way into stdout while my program is running? I find this doubtful as the errors are appearing consistently on the 4th fwrite read in the 10th entry.

Per Craig's request here is the full C++ code (the full Python Code is already posted): it is sitting in 3 files:

main.cpp

#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <iostream>
#include "./MutualInformation.h"

double * data;
using namespace std;

void
xxwrite(unsigned char *buf, size_t wlen, FILE *fo)
{
    size_t xlen;

    for (; wlen > 0; wlen -= xlen, buf += xlen) {
        xlen = wlen;
        if (xlen > 1024)
            xlen = 1024;
        xlen = fwrite(buf, 1, xlen, fo);
        fflush(fo);
    }
}

int main(int argc, char **argv) {
    int count = 0;
    long totalbytes = stoi(argv[argc-4], nullptr,10);       //bytes being transfered
    long bytechunk = stoi(argv[argc - 3], nullptr, 10); //bytes being transfered at a time
    long height = stoi(argv[argc-2], nullptr, 10);  //bytes being transfered at a time
    long width  = stoi(argv[argc-1], nullptr, 10);  //bytes being transfered at a time
    long offset = totalbytes / sizeof(double) / 2;


    data = new double[totalbytes/sizeof(double)];
    int columnindex = 0;
    //read in data from pipe
    while (count<totalbytes) {

        fread(&(data[columnindex]), 1, bytechunk, stdin);
        columnindex += bytechunk / sizeof(double);
        count += bytechunk;

    }


    //calculate the data transform
    MutualInformation MI = MutualInformation();
    MI.Initialize(data, height, width, offset);
    MI.calcMI();
    count = 0;

    columnindex = 0;
    while (count<totalbytes/2) {

        xxwrite((unsigned char*)&(MI.getOutput()[columnindex]),  bytechunk, stdout);
        count += bytechunk;
        columnindex += bytechunk/sizeof(double);
    }
    delete [] data;

    return 0;
}

MutualInformation.h

#include <map>

using namespace std;

class MutualInformation
{
private:
    double * mData;
    double * mOutput;
    long mHeight;
    long mWidth;
    long mOffset;

public:
    MutualInformation();
    ~MutualInformation();
    bool Initialize(double * data, long Height, long Width, long Offset);
    const double * getOutput();

    double calcMI();

};

MutualInformation.cpp

#include "MutualInformation.h"


MutualInformation::MutualInformation()
{
    this->mData = nullptr;
    this->mOutput = nullptr;
    this->mHeight = 0;
    this->mWidth = 0;

}


MutualInformation::~MutualInformation()
{
    delete[] this->mOutput;
}

bool MutualInformation::Initialize(double * data, long Height, long Width, long Offset){
    bool rvalue = false;
    this->mData = data;
    this->mHeight = Height;
    this->mWidth = Width;
    this->mOffset = Offset;


    //allocate output data
    this->mOutput = new double[this->mHeight*this->mWidth];

    return rvalue;
}

const double * MutualInformation::getOutput(){
    return this->mOutput;
}


double MutualInformation::calcMI(){
    double rvalue = 0.0;
    std::map<int, map<int, double>> lHistXY = map<int, map<int, double>>();
    std::map<int, double> lHistX = map<int, double>();
    std::map<int, double> lHistY = map<int, double>();
    typedef std::map<int, std::map<int, double>>::iterator HistXY_iter;
    typedef std::map<int, double>::iterator HistY_iter;

    //calculate Entropys and MI
    double MI = 0.0;
    double Hx = 0.0;
    double Hy = 0.0;
    double Px = 0.0;
    double Py = 0.0;
    double Pxy = 0.0;

    //scan through the image
    int ip = 0;
    int jp = 0;
    int chipsize = 3;

    //setup zero array
    double * zeros = new double[this->mHeight];
    for (int j = 0; j < this->mHeight; j++){
        zeros[j] = 0.0;
    }

    //zero out Output array
    for (int i = 0; i < this->mWidth; i++){
        memcpy(&(this->mOutput[i*this->mHeight]), zeros, this->mHeight*8);
    }


    double index = 0.0;
    for (int ioutter = chipsize; ioutter < (this->mWidth - chipsize); ioutter++){

        for (int j = chipsize; j < (this->mHeight - chipsize); j++){

            //clear the histograms
            lHistX.clear();
            lHistY.clear();
            lHistXY.clear();
            //chip out a section of the image
            for (int k = -chipsize; k <= chipsize; k++){
                for (int l = -chipsize; l <= chipsize; l++){
                    ip = ioutter + k;
                    jp = j + l;
                    //update X histogram
                    if (lHistX.count(int(this->mData[ip*this->mHeight + jp]))){
                        lHistX[int(this->mData[ip*this->mHeight + jp])] += 1.0;
                    }else{
                        lHistX[int(this->mData[ip*this->mHeight + jp])] = 1.0;

                    }
                    //update Y histogram
                    if (lHistY.count(int(this->mData[ip*this->mHeight + jp+this->mOffset]))){
                        lHistY[int(this->mData[ip*this->mHeight + jp+this->mOffset])] += 1.0;
                    }
                    else{
                        lHistY[int(this->mData[ip*this->mHeight + jp+this->mOffset])] = 1.0;

                    }

                    //update X and Y Histogram
                    if (lHistXY.count(int(this->mData[ip*this->mHeight + jp]))){ 
                        //X Key exists check if Y key exists
                        if (lHistXY[int(this->mData[ip*this->mHeight + jp])].count(int(this->mData[ip*this->mHeight + jp + this->mOffset]))){
                            //X & Y keys exist
                            lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] += 1;
                        }else{
                            //X exist but Y doesn't
                            lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] = 1;
                        }
                    }else{
                        //X Key Didn't exist
                        lHistXY[int(this->mData[ip*this->mHeight + jp])][int(this->mData[ip*this->mHeight + jp + this->mOffset])] = 1;
                    };
                }
            }

            //calculate PMI, Hx, Hy
            // iterator->first = key
            // iterator->second = value

             MI = 0.0;
             Hx = 0.0;
             Hy = 0.0;

            for (HistXY_iter Hist2D_iter = lHistXY.begin(); Hist2D_iter != lHistXY.end(); Hist2D_iter++) {

                Px = lHistX[Hist2D_iter->first] / ((double) this->mOffset);
                Hx -= Px*log(Px);

                for (HistY_iter HistY_iter = Hist2D_iter->second.begin(); HistY_iter != Hist2D_iter->second.end(); HistY_iter++) {
                    Py = lHistY[HistY_iter->first] / ((double) this->mOffset);
                    Hy -= Py*log(Py);
                    Pxy = HistY_iter->second / ((double) this->mOffset);
                    MI += Pxy*log(Pxy / Py / Px);
                }
            }

            //normalize PMI to max(Hx,Hy) so that the PMI value runs from 0 to 1
            if (Hx >= Hy && Hx > 0.0){
                MI /= Hx;
            }else if(Hy > Hx && Hy > 0.0){
                MI /= Hy;
            }
            else{
                MI = 0.0;
            }

            //write PMI to data output array
            if (MI < 1.1){
                this->mOutput[ioutter*this->mHeight + j] = MI;
            }
            else{
                this->mOutput[ioutter*this->mHeight + j] = 0.0;
                //cout << "problem with output";
            }

        }



    }



    //*/
    return rvalue;
}

SOLVED By 6502

6502's answer below solved my problem. I needed to explicitly tell Windows to use a binary mode for stdin / stdout. to do that I had to include 2 new header files in my main cpp file.

#include <fcntl.h>
#include <io.h>

add the following lines of code (modified away from 6502's POSIX versions because Visual Studio complained) to the beginning of my main function

_setmode(_fileno(stdout), O_BINARY);
_setmode(_fileno(stdin), O_BINARY);

and then add these lines to my Python code:

import os, msvcrt
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
like image 289
Semicolons and Duct Tape Avatar asked May 02 '16 16:05

Semicolons and Duct Tape


2 Answers

The problem is that stdin/stdout in windows are opened in text mode, not in binary mode and therefore will mess up when the character 13 (\r) is sent.

You can set for example binary mode in Python with

import os, msvcrt
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)

and in C++ with

_setmode(fileno(stdout), O_BINARY);
_setmode(fileno(stdin), O_BINARY);

See https://msdn.microsoft.com/en-us/library/tw4k6df8.aspx

like image 180
6502 Avatar answered Sep 30 '22 23:09

6502


Your C++ fwrite code does not account for getting a "short" transfer.

Here's a slight tweak:

//write out data to pipe
columnindex = 0;
while (count < totalbytes / 2) {
    wlen = fwrite(&(MI.getOutput()[columnindex]), 1, bytechunk, stdout);
    fflush(stdout);
    count += wlen;
    columnindex += wlen / sizeof(double);
}

Note: You still need to be careful as this would still have issues if wlen returns and it's not a multiple of sizeof(double). For example, if bytechunk were 16 and wlen came back with 14, you'd need an additional fwrite with length 2 before continuing the loop. A generalization of this is just to treat the entire data matrix as a giant byte buffer and loop on that.

Actually, you'll get about the same efficiency with many much smaller transfers that are capped by a fixed (i.e. "known safe amount") of [say] 1024 bytes. This works because the output is a byte stream.

Here's a slightly more general solution that I've often used:

void
xxwrite(void *buf,size_t wlen,FILE *fo)
{
    size_t xlen;

    for (;  wlen > 0;  wlen -= xlen, buf += xlen) {
        xlen = wlen;
        if (xlen > 1024)
            xlen = 1024;
        xlen = fwrite(buf,1,xlen,fo);
        fflush(fo);
    }
}

//write out data to pipe
columnindex = 0;
while (count < totalbytes / 2) {
    xxwrite(&(MI.getOutput()[columnindex]), bytechunk, stdout);
    count += bytechunk;
    columnindex += bytechunk / sizeof(double);
}

UPDATE:

I've downloaded all your code and run it. I've got good news and bad news: The code runs fine here, even for a matrix size above 3000. I ran it both using xxwrite and without and the results were the same.

Using my limited python skills, I added some pretty print to your python script (e.g. some line wrap) and had it check every value for range and annotate any bad values. There were none found by the script. Also, visual inspection of the values turned up nothing [this was true before the pretty print, so it hasn't introduced anything]. Just lots of zeros and then blocks in the 0.9 range.

The only difference I can see is that I'm using gcc [and, of course, python] on linux. But, from your script it seems your using Windows [based on the C:\... path for your C++ executable. This shouldn't matter for this application, but I mention it anyway.

So, pipes work here. One thing you might try is to direct the C++ output to a file. Then, have the script read back from the file (i.e. no pipe) and see if that makes a difference. I tend to think not, but ...

Also, I don't know what compiler and python implementation you're using under Windows. Whenever I have to do this, I usually have Cygwin installed as it gives one of the closest implementations of linux/Unix-like environment (i.e. pipes are more likely to work as advertised).

Anyway, here's the modified script. Also note that I added os.getenv to grab alternate matrix sizes and an alternate place for the C++ executable, so that it would work for both of us with minimal pain

#!/usr/bin/python

import subprocess
import struct
import sys
import os
import numpy as np

val = os.getenv("MTX","2000")
sizeX = int(val)
sizeY = sizeX
print "sizeX=%d sizeY=%d" % (sizeX,sizeY)

#set up the variables needed
bytesPerDouble = 8
offset = sizeX*sizeY
totalBytesPerArray = sizeX*sizeY*bytesPerDouble
totalBytes = totalBytesPerArray*2                   #the 2 is because we pass 2 different versions of the 2D array

#setup the testing data array
a = np.zeros(sizeX*sizeY*2, dtype='d')
for i in range(sizeX):
    for j in range(sizeY):
        a[j+i*sizeY] = i
        a[j+i*sizeY+offset] = i
        if i % 10 == 0:
            a[j+i*sizeY+offset] = j

data = a.tobytes('C')

strTotalBytes = str(totalBytes)
strLineBytes  = str(sizeY*bytesPerDouble)

#communicate with c++ code
print("starting C++ code")

command = os.getenv("CPGM",None);
if command is None:
    command =   "C:\Python27\PythonPipes.exe"

proc = subprocess.Popen([command, strTotalBytes, strLineBytes, str(sizeY), str(sizeX)], stdin=subprocess.PIPE,stderr=subprocess.PIPE,stdout=subprocess.PIPE)

ByteBuffer = (data)
proc.stdin.write(ByteBuffer)

def prt(i,b):

    hangflg = 0
    per = 8

    for j in range(0,len(b)):
        if ((j % per) == 0):
            print("[%d,%d]" % (i,j)),

        q = b[j]
        print(q),
        hangflg = 1

        if (q < 0.0) or (q > 1.0):
            print("=WTF"),

        if ((j % per) == (per - 1)):
            print("")
            hangflg = 0

    if (hangflg):
        print("")

print("Reading results back from C++")
for i in range(sizeX):
    returnvalues = proc.stdout.read(sizeY*bytesPerDouble)
    a = buffer(returnvalues)
    b = struct.unpack_from(str(sizeY)+'d', a)
    prt(i,b)
    ###print str(b) + " " + str(i)
    ###print str(i) + ": " + str(b)

print('done')
like image 34
Craig Estey Avatar answered Oct 01 '22 00:10

Craig Estey