I accept the file via POST. When I save it locally, I can read the content using file.read (), but the name via file.name incorrect(16) is displayed. When I try to find it by this name, I get an error. What might be the problem?
My code:
@router.post(
path="/upload",
response_model=schema.ContentUploadedResponse,
)
async def upload_file(
background_tasks: BackgroundTasks,
uploaded_file: UploadFile = File(...)):
uploaded_file.file.rollover()
uploaded_file.file.flush()
#shutil.copy(uploaded_file.file.name, f'../api/{uploaded_file.filename}')
background_tasks.add_task(s3_upload, uploaded_file=fp)
return schema.ContentUploadedResponse()
UploadFile
is just a wrapper around SpooledTemporaryFile
, which can be accessed as UploadFile.file
.
SpooledTemporaryFile() [...] function operates exactly as TemporaryFile() does
And documentation about TemporaryFile
says:
Return a file-like object that can be used as a temporary storage area. [..] It will be destroyed as soon as it is closed (including an implicit close when the object is garbage collected). Under Unix, the directory entry for the file is either not created at all or is removed immediately after the file is created. Other platforms do not support this; your code should not rely on a temporary file created using this function having or not having a visible name in the file system.
async def
endpointYou should use the following async methods of UploadFile
: write
, read
, seek
and close
. They are executed in a thread pool and awaited asynchronously.
For async writing files to disk you can use aiofiles
. Example:
@app.post("/")
async def post_endpoint(in_file: UploadFile=File(...)):
# ...
async with aiofiles.open(out_file_path, 'wb') as out_file:
content = await in_file.read() # async read
await out_file.write(content) # async write
return {"Result": "OK"}
Or in the chunked manner, so as not to load the entire file into memory:
@app.post("/")
async def post_endpoint(in_file: UploadFile=File(...)):
# ...
async with aiofiles.open(out_file_path, 'wb') as out_file:
while content := await in_file.read(1024): # async read chunk
await out_file.write(content) # async write chunk
return {"Result": "OK"}
def
endpointAlso, I would like to cite several useful utility functions from this topic (all credits @dmontagu) using shutil.copyfileobj
with internal UploadFile.file
. This functions can be invoked from def
endpoints:
import shutil
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Callable
from fastapi import UploadFile
def save_upload_file(upload_file: UploadFile, destination: Path) -> None:
try:
with destination.open("wb") as buffer:
shutil.copyfileobj(upload_file.file, buffer)
finally:
upload_file.file.close()
def save_upload_file_tmp(upload_file: UploadFile) -> Path:
try:
suffix = Path(upload_file.filename).suffix
with NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
shutil.copyfileobj(upload_file.file, tmp)
tmp_path = Path(tmp.name)
finally:
upload_file.file.close()
return tmp_path
def handle_upload_file(
upload_file: UploadFile, handler: Callable[[Path], None]
) -> None:
tmp_path = save_upload_file_tmp(upload_file)
try:
handler(tmp_path) # Do something with the saved temp file
finally:
tmp_path.unlink() # Delete the temp file
Note: you'd want to use the above functions inside of
def
endpoints, notasync def
, since they make use of blocking APIs.
You can save the uploaded files this way,
from fastapi import FastAPI, File, UploadFile
app = FastAPI()
@app.post("/upload-file/")
async def create_upload_file(uploaded_file: UploadFile = File(...)):
file_location = f"files/{uploaded_file.filename}"
with open(file_location, "wb+") as file_object:
file_object.write(uploaded_file.file.read())
return {"info": f"file '{uploaded_file.filename}' saved at '{file_location}'"}
You can also use the shutil.copyfileobj(...)
method (see this detailed answer to how both are working behind the scenes).
So, as an alternative way, you can write something like the below using the shutil.copyfileobj(...)
to achieve the file upload functionality.
import shutil
from fastapi import FastAPI, File, UploadFile
app = FastAPI()
@app.post("/upload-file/")
async def create_upload_file(uploaded_file: UploadFile = File(...)):
file_location = f"files/{uploaded_file.filename}"
with open(file_location, "wb+") as file_object:
shutil.copyfileobj(uploaded_file.file, file_object)
return {"info": f"file '{uploaded_file.filename}' saved at '{file_location}'"}
In my case, I need to handle huge files, so I must avoid reading them all into memory. What I want is to save them to disk asynchronously, in chunks.
I'm experimenting with this and it seems to do the job (CHUNK_SIZE is quite arbitrarily chosen, further tests are needed to find an optimal size):
import os
import logging
from fastapi import FastAPI, BackgroundTasks, File, UploadFile
log = logging.getLogger(__name__)
app = FastAPI()
DESTINATION = "/"
CHUNK_SIZE = 2 ** 20 # 1MB
async def chunked_copy(src, dst):
await src.seek(0)
with open(dst, "wb") as buffer:
while True:
contents = await src.read(CHUNK_SIZE)
if not contents:
log.info(f"Src completely consumed\n")
break
log.info(f"Consumed {len(contents)} bytes from Src file\n")
buffer.write(contents)
@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile = File(...)):
fullpath = os.path.join(DESTINATION, file.filename)
await chunked_copy(file, fullpath)
return {"File saved to disk at": fullpath}
However, I'm quickly realizing that create_upload_file
is not invoked until the file has been completely received. So, if this code snippet is correct it will probably be beneficial to performance but will not enable anything like providing feedback to the client about the progress of the upload and it will perform a full data copy in the server. It seems silly to not be able to just access the original UploadFile temporary file, flush it and just move it somewhere else, thus avoiding a copy.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With