Problematic
I have a PIL Image and i want to convert it to a bytes array. I can't save the image on my hard disk so i can't use the default open(file_path, 'rb')
function.
What i tried
To overturn this problem i'm trying to use the io
library doing this :
buf = io.BytesIO()
image.save(buf, format='JPEG')
b_image = buf.getvalue()
Considering image as a functional PIL Image.
the "b_image" will be used as argument for the Microsoft Azure cognitives services function read_in_stream()
If we look in the documentation, we can see that this function image argument have to be :
image xref:Generator
Required
An image stream.
Documentation available here
The issue
When i execute it i got the error :
File "C:...\envs\trainer\lib\site-packages\msrest\service_client.py", line 137, in stream_upload chunk = data.read(self.config.connection.data_block_size)
AttributeError: 'bytes' object has no attribute 'read'
There is no error in the client authentification or at another point because when i give as parameter an image imported with this line :
image = open("./1.jpg", 'rb')
Everything is working correctly..
Sources
I also saw this post that explains exactly what i want to do but in my case it's not working. Any idea would be appreciated.
When we use the method read_in_stream
, we need to provide a stream. But the code BytesIO.getvalue
will return the content of the stream as string or bytes. So please update code as below
buf = io.BytesIO()
image.save(buf, format='JPEG')
computervision_client.read_in_stream(buf)
For more details, please refer to here
Update
Regarding the issue, I suggest you use rest API to implement your need.
import io
import requests
from PIL import Image
import time
url = "{your endpoint}/vision/v3.1/read/analyze"
key = ''
headers = {
'Ocp-Apim-Subscription-Key': key,
'Content-Type': 'application/octet-stream'
}
// process image
...
with io.BytesIO() as buf:
im.save(buf, 'jpeg')
response = requests.request(
"POST", url, headers=headers, data=buf.getvalue())
# get result
while True:
res = requests.request(
"GET", response.headers['Operation-Location'], headers=headers)
status = res.json()['status']
if status == 'succeeded':
print(res.json()['analyzeResult'])
break
time.sleep(1)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With