Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to eat bytes flowing out of a stream?

i am fixing a ZIP library class. Internally, nearly all ZIP implementations use DEFLATE compression (RFC1951).

The problem is that, in Delphi, i don't have access to any DEFLATE compression libraries. But the one thing we do have plenty of is ZLIB compression code (RFC1950). It even ships with Delphi, and there are a half-dozen other implementations floating around.

Internally, ZLIB also uses DEFLATE for compression. So i want to do what everyone has done - use the Delphi zlib library for its DEFLATE compression functionality.

The problem is that ZLIB adds a 2-byte prefix, and 4-byte trailer to the DEFLATED data:

[CMF]                1 byte
[FLG]                1 byte
[...deflate compressed data...]
[Adler-32 checksum]  4 bytes

So what i need is a way to use the standard TCompressionStream (or TZCompressionStream, or TZCompressionStreamEx depending on the source code you're using) stream to compress data:

procedure CompressDataToTargetStream(sourceStream: TStream; targetStream: TStream);
var
   compressor: TCompressionStream;
begin
   compressor := TCompressionStream.Create(clDefault, targetStream); //clDefault = CompressionLevel
   try
      compressor.CopyFrom(sourceStream, sourceStream.Length)
   finally
      compressor.Free; 
   end;
end;

And that works, except it wrote out the leading 2-bytes and trailing 4-bytes; i need to strip those.

So i wrote a TByteEaterStream:

TByteEaterStream = class(TStream)
public
   constructor Create(TargetStream: TStream; 
         LeadingBytesToEat, TrailingBytesToEat: Integer);
end;

for example

procedure CompressDataToTargetStream(sourceStream: TStream; targetStream: TStream);
var
   byteEaterStream: TByteEaterStream;
   compressor: TCompressionStream;
begin
   byteEaterStream := TByteEaterStream.Create(targetStream, 2, 4); //2 leading bytes, 4 trailing bytes
   try
      compressor := TCompressionStream.Create(clDefault, byteEaterStream); //clDefault = CompressionLevel
      try
         compressor.CopyFrom(sourceStream, sourceStream.Length)
      finally
         compressor.Free; 
      end;
   finally
      byteEaterStream.Free;
   end;
end;

This stream overrides the write method. It's trivial to eat the first 2 bytes. The trick was to eat the trailing 4 bytes.

The eater stream has a 4-byte array, and i always hold the last four bytes of every write in the buffer. When the EaterStream is destroyed, the trailing four bytes go with it.

The problem is that shuffling a few million writes through this buffer is killing performance. The typical use upstream is:

for each of a million data rows
    stream.Write(s, Length(s)); //30-90 character string

i definitely don't want the upstream user to have to indicate that "the end is near". i just want it to be faster.

The Question

Watching a stream of bytes flowing by, what is the best way to hold-back the last four bytes; given that you don't know at what moment the write will be the last.

The code that i'm fixing wrote the entire compressed version into a TStringStream, and then only grabbed the 900MB - 6 bytes to get at the internal DEFLATE data:

cs := TStringStream.Create('');
....write compressed data to cs
S := Copy(CS.DataString, 3, Length(CS.DataString) - 6);

Except that runs the user out of memory. Initially i changed it to write to a TFileStream, then i could perform the same trick.

But i want the better solution; the stream solution. i want the data to go into the final stream compressed, without any intermediate storage.

My implementation

Not that it helps anything; because i'm not necesarilly asking for a system that even uses an adapting stream to do the trimming

TByteEaterStream = class(TStream)
private
    FTargetStream: TStream;
    FTargetStreamOwnership: TStreamOwnership;
    FLeadingBytesToEat: Integer;
    FTrailingBytesToEat: Integer;
    FLeadingBytesRemaining: Integer;

    FBuffer: array of Byte;
    FValidBufferLength: Integer;
    function GetBufferValidLength: Integer;
public
    constructor Create(TargetStream: TStream; LeadingBytesToEat, TrailingBytesToEat: Integer; StreamOwnership: TStreamOwnership=soReference);
    destructor Destroy; override;

    class procedure SelfTest;

    procedure Flush;

    function Read(var Buffer; Count: Longint): Longint; override;
    function Write(const Buffer; Count: Longint): Longint; override;
    function Seek(Offset: Longint; Origin: Word): Longint; override;
end;

{ TByteEaterStream }

constructor TByteEaterStream.Create(TargetStream: TStream; LeadingBytesToEat, TrailingBytesToEat: Integer; StreamOwnership: TStreamOwnership=soReference);
begin
    inherited Create;

    //User requested state
    FTargetStream := TargetStream;
    FTargetStreamOwnership := StreamOwnership;
    FLeadingBytesToEat := LeadingBytesToEat;
    FTrailingBytesToEat := TrailingBytesToEat;

    //internal housekeeping
    FLeadingBytesRemaining := FLeadingBytesToEat;

    SetLength(FBuffer, FTrailingBytesToEat);
    FValidBufferLength := 0;
end;

destructor TByteEaterStream.Destroy;
begin
    if FTargetStreamOwnership = soOwned then
        FTargetStream.Free;
    FTargetStream := nil;

    inherited;
end;

procedure TByteEaterStream.Flush;
begin
    if FValidBufferLength > 0 then
    begin
        FTargetStream.Write(FBuffer[0], FValidBufferLength);
        FValidBufferLength  := 0;
    end;
end;

function TByteEaterStream.Write(const Buffer; Count: Integer): Longint;
var
    newStart: Pointer;
    totalCount: Integer;
    addIndex: Integer;
    bufferValidLength: Integer;
    bytesToWrite: Integer;
begin
    Result := Count;

    if Count = 0 then
        Exit;

    if FLeadingBytesRemaining > 0 then
    begin
        newStart := Addr(Buffer);
        Inc(Cardinal(newStart));
        Dec(Count);
        Dec(FLeadingBytesRemaining);
        Result := Self.Write(newStart^, Count)+1; //tell the upstream guy that we wrote it

        Exit;
    end;

    if FTrailingBytesToEat > 0 then
    begin
        if (Count < FTrailingBytesToEat) then
        begin
            //There's less bytes incoming than an entire buffer
            //But the buffer might overfloweth
            totalCount := FValidBufferLength+Count;

            //If it could all fit in the buffer, then let it
            if (totalCount <= FTrailingBytesToEat) then
            begin
                Move(Buffer, FBuffer[FValidBufferLength], Count);
                FValidBufferLength := totalCount;
            end
            else
            begin
                //We're going to overflow the buffer.

                //Purge from the buffer the amount that would get pushed
                FTargetStream.Write(FBuffer[0], totalCount-FTrailingBytesToEat);

                //Shuffle the buffer down (overlapped move)
                bufferValidLength := bufferValidLength - (totalCount-FTrailingBytesToEat);
                Move(FBuffer[totalCount-FTrailingBytesToEat], FBuffer[0], bufferValidLength);

                addIndex := bufferValidLength ; //where we will add the data to
                Move(Buffer, FBuffer[addIndex], Count);
            end;
        end
        else if (Count = FTrailingBytesToEat) then
        begin
            //The incoming bytes exactly fill the buffer. Flush what we have and eat the incoming amounts
            Flush;
            Move(Buffer, FBuffer[0], FTrailingBytesToEat);
            FValidBufferLength := FTrailingBytesToEat;
            Result := FTrailingBytesToEat; //we "wrote" n bytes
        end
        else
        begin
            //Count is greater than trailing buffer eat size
            Flush;

            //Write the data that definitely not to be eaten
            bytesToWrite := Count-FTrailingBytesToEat;
            FTargetStream.Write(Buffer, bytesToWrite);

            //Buffer the remainder
            newStart := Addr(Buffer);
            Inc(Cardinal(newStart), bytesToWrite);

            Move(newStart^, FBuffer[0], FTrailingBytesToEat);
            FValidBufferLength := 4;
        end;
    end;
end;

function TByteEaterStream.Seek(Offset: Integer; Origin: Word): Longint;
begin
    //what does it mean if they want to seek around when i'm supposed to be eating data?
    //i don't know; so results are, by definition, undefined. Don't use at your own risk
    Result := FTargetStream.Seek(Offset, Origin);
end;

function TByteEaterStream.Read(var Buffer; Count: Integer): Longint;
begin
    //what does it mean if they want to read back bytes when i'm supposed to be eating data?
    //i don't know; so results are, by definition, undefined. Don't use at your own risk
    Result := FTargetStream.Read({var}Buffer, Count);
end;

class procedure TByteEaterStream.SelfTest;

    procedure CheckEquals(Expected, Actual: string; Message: string);
    begin
        if Actual <> Expected then
            raise Exception.CreateFmt('TByteEaterStream self-test failed. Expected "%s", but was "%s". Message: %s', [Expected, Actual, Message]);
    end;

    procedure Test(const InputString: string; ExpectedString: string);
    var
        s: TStringStream;
        eater: TByteEaterStream;
    begin
        s := TStringStream.Create('');
        try
            eater := TByteEaterStream.Create(s, 2, 4, soReference);
            try
                eater.Write(InputString[1], Length(InputString));
            finally
                eater.Free;
            end;
            CheckEquals(ExpectedString, s.DataString, InputString);
        finally
            s.Free;
        end;
    end;
begin
    Test('1', '');
    Test('11', '');
    Test('113', '');
    Test('1133', '');
    Test('11333', '');
    Test('113333', '');
    Test('11H3333', 'H');
    Test('11He3333', 'He');
    Test('11Hel3333', 'Hel');
    Test('11Hell3333', 'Hell');
    Test('11Hello3333', 'Hello');
    Test('11Hello,3333', 'Hello,');
    Test('11Hello, 3333', 'Hello, ');
    Test('11Hello, W3333', 'Hello, W');
    Test('11Hello, Wo3333', 'Hello, Wo');
    Test('11Hello, Wor3333', 'Hello, Wor');
    Test('11Hello, Worl3333', 'Hello, Worl');
    Test('11Hello, World3333', 'Hello, World');
    Test('11Hello, World!3333', 'Hello, World!');
end;
like image 394
Ian Boyd Avatar asked Nov 06 '13 17:11

Ian Boyd


2 Answers

The entire problem can be avoided by simply asking zlib to not wrap the deflate stream. I don't see the interface to zlib in the code in the question, but somewhere there is an initialization using deflateInit() or deflateInit2(). If you use deflateInit2(), you can provide -15 instead of 15 for the windowBits parameter to ask for unwrapped deflate output.

like image 104
Mark Adler Avatar answered Oct 31 '22 18:10

Mark Adler


You need to postpone writing until you are sure you know that the bytes to be written are not the trailing bytes that must be eaten. That observation leads you to the thought that buffering will provide a solution.

So, I'd suggest this:

  1. Use a stream adapter that uses buffering.
  2. Eating the lead bytes is easy. Just sent the first two bytes into oblivion.
  3. After that buffer the bytes to be written, and when it's time to flush, flush all but the final four bytes in the buffer.
  4. When you flush, copy the four bytes that you did not flush to the beginning of your buffer so that you don't lose them.
  5. When you close the stream, flush it, as you always would for a buffered stream. And use the same flushing technique as before, so that you hold on to the final four bytes. At this point you know that these are the final four bytes of the stream.

One requirement that the above approach demands is that your buffer must be larger in size than the number of trailing bytes to be stripped.

like image 32
David Heffernan Avatar answered Oct 31 '22 18:10

David Heffernan