Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can I read parquet from HTTP(s) octet-stream?

Some backend-endpoint returns parquet-file in octet-stream.

In pandas I can do something like this:

result = requests.get("https://..../file.parquet")
df = pd.read_parquet(io.BytesIO(result.content))

Can I do it in Dask somehow?

This code:

dd.read_parquet("https://..../file.parquet")

Raises exception (obviously, because this is bytes-like object):

  File "to_parquet_dask.py", line 153, in <module>
    main(*parser.parse_args())
  File "to_parquet_dask.py", line 137, in main
    download_parquet(
  File "to_parquet_dask.py", line 121, in download_parquet
    dd.read_parquet(
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 313, in read_parquet
    read_metadata_result = engine.read_metadata(
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/fastparquet.py", line 733, in read_metadata
    parts, pf, gather_statistics, base_path = _determine_pf_parts(
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/fastparquet.py", line 148, in _determine_pf_parts
    elif fs.isdir(paths[0]):
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/asyn.py", line 88, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/asyn.py", line 69, in sync
    raise result[0]
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/asyn.py", line 25, in _runner
    result[0] = await coro
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/implementations/http.py", line 418, in _isdir
    return bool(await self._ls(path))
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/implementations/http.py", line 195, in _ls
    out = await self._ls_real(url, detail=detail, **kwargs)
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/implementations/http.py", line 150, in _ls_real
    text = await r.text()
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/aiohttp/client_reqrep.py", line 1082, in text
    return self._body.decode(encoding, errors=errors)  # type: ignore
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x90 in position 7: invalid start byte

UPD

With changes in fsspec from @mdurant answer I got error

ValueError: Cannot seek streaming HTTP file

So I put "simplecache::" to my url and I face next:

Traceback (most recent call last):
  File "to_parquet_dask.py", line 161, in <module>
    main(*parser.parse_args())
  File "to_parquet_dask.py", line 145, in main
    download_parquet(
  File "to_parquet_dask.py", line 128, in download_parquet
    dd.read_parquet(
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 313, in read_parquet
    read_metadata_result = engine.read_metadata(
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/fastparquet.py", line 733, in read_metadata
    parts, pf, gather_statistics, base_path = _determine_pf_parts(
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/fastparquet.py", line 185, in _determine_pf_parts
    pf = ParquetFile(
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fastparquet/api.py", line 127, in __init__
    raise ValueError("Opening directories without a _metadata requires"
ValueError: Opening directories without a _metadata requiresa filesystem compatible with fsspec

Temperary workaround

Maybe this way is dirty and not optimal, but some kind of works:

@dask.delayed
def parquet_from_http(url, token):
    result = requests.get(
        url,
        headers={'Authorization': token}
    )
    return pd.read_parquet(io.BytesIO(result.content))
    
delayed_download = parquet_from_http(url, token)
df = dd.from_delayed(delayed_download, meta=meta)

p.s. meta argument in this approach is necessary, because otherwise dask will use this function twice: to find out meta and than to calculate, so two requests will be made.

like image 901
bc30138 Avatar asked Nov 06 '22 00:11

bc30138


1 Answers

This is not an answer, but I believe the following change in fsspec will fix your problem. If you would be willing to try and confirm, we can make this a patch.

--- a/fsspec/implementations/http.py
+++ b/fsspec/implementations/http.py
@@ -472,7 +472,10 @@ class HTTPFileSystem(AsyncFileSystem):

     async def _isdir(self, path):
         # override, since all URLs are (also) files
-        return bool(await self._ls(path))
+        try:
+            return bool(await self._ls(path))
+        except (FileNotFoundError, ValueError):
+            return False

(we can put this in a branch, if that makes it easier for you to install)

-edit-

The second problem (which is the same thing in both parquet engines) stems from the server either not providing the size of the file, or not allowing range-gets. The parquet format requires random access to the data to be able to read. The only way to get around this (short of improving the server) is to copy the whole file locally, e.g., by prepending "simplecache::" to your URL.

like image 130
mdurant Avatar answered Nov 12 '22 09:11

mdurant