Azure / azure-functions-python-worker

Python worker for Azure Functions.
http://aka.ms/azurefunctions
MIT License
335 stars 103 forks source link

InputStream object cannot be used with pandas #832

Open tonybaloney opened 3 years ago

tonybaloney commented 3 years ago

In this example:

import logging
import os
from azure.cosmosdb.table.tableservice import TableService
import azure.functions as func
import pandas

def main(myblob: func.InputStream):
    logging.info(f"Python blob trigger function processed blob \n"
                 f"Name: {myblob.name}\n"
                 f"Blob Size: {myblob.length} bytes")
    df = pandas.read_excel(myblob, index_col=0)

    table_service = TableService(account_name=os.getenv('TABLE_STORAGE_ACCOUNT_NAME'), 
                                 account_key=os.getenv('TABLE_STORAGE_KEY'))

    records = df.to_dict(orient="records") 
    for record in records:
        table_service.insert_entity('ais', record)

Configured with function.json as:

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "myblob",
      "type": "blobTrigger",
      "direction": "in",
      "path": "ais/{name}.xlsx",
      "connection": "anthonyshawstorage_STORAGE"
    }
  ]
}

The script fails because InputStream does not implement the seek function. It does implement seakable() as returning false, which is correct, but pandas doesn't adhere to that API so you get this error:

Exception while executing function: Functions.BlobTrigger Result: Failure
Exception: UnsupportedOperation: seek
Stack:   File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py", line 355, in _handle__invocation_request
    call_result = await self._loop.run_in_executor(
  File "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 52, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py", line 542, in __run_sync_func
    return func(**params)
  File "/home/site/wwwroot/BlobTrigger/__init__.py", line 13, in main
    df = pandas.read_excel(myblob, index_col=0)
  File "/home/site/wwwroot/.python_packages/lib/site-packages/pandas/util/_decorators.py", line 299, in wrapper
    return func(*args, **kwargs)
  File "/home/site/wwwroot/.python_packages/lib/site-packages/pandas/io/excel/_base.py", line 336, in read_excel
    io = ExcelFile(io, storage_options=storage_options, engine=engine)
  File "/home/site/wwwroot/.python_packages/lib/site-packages/pandas/io/excel/_base.py", line 1057, in __init__
    ext = inspect_excel_format(
  File "/home/site/wwwroot/.python_packages/lib/site-packages/pandas/io/excel/_base.py", line 942, in inspect_excel_format
    stream.seek(0)

Technically, this is a bug in pandas, because it should check seekable() first.

Is there a way to make the inputstream seekable if it comes from blob storage?

diegopajarito commented 3 years ago

I'm having the same issue but simply using the JSON Library instead of pandas.

I assume this is related to the seekable() check

lindacmsheard commented 2 years ago

+1: This is also an issue when trying to use pyPDF2 's PdfFileReader class: https://pythonhosted.org/PyPDF2/PdfFileReader.html

lindacmsheard commented 2 years ago

I know this is late to the party and a slightly different case, but in case it helps anybody:

Follow up: I understand now that given the file comes over the network, it can't be seekable. 💡

My solution therefore is to use a io.BytesIO stream as an intermediary (BytesIO since I'm working with pdfs - for the original poster it may need to be io.StringIO.)

The purpose of the code is to read a pdf, allow the library pyPDF2 to extract the first page, and save that page to the output binding. Similarly to pandas in the original post, pyPDF2 requires seekable streams to work.

import logging

import azure.functions as func

from PyPDF2 import PdfFileReader, PdfFileWriter  # library that requires seekable streams
from io import BytesIO

def main(fullpdf: func.InputStream, page1: func.Out[func.InputStream]):
    logging.info(f"Python blob trigger function processed blob \n"
                 f"Name: {fullpdf.name}\n"
                 f"Blob Size: {fullpdf.length} bytes")

    pdfbytes = fullpdf.read()                                     # returns bytes
    logging.info(f"pdfbytes: {type(pdfbytes)}")

    # create intermediary input stream
    fullpdfstream = BytesIO(pdfbytes)                             # returns seekable stream
    logging.info(f"fullpdfstream: {type(fullpdfstream)}")

    pdf = PdfFileReader(fullpdfstream)

    pages = [0]
    pdfWriter = PdfFileWriter()

    for page_num in pages:
        pdfWriter.addPage(pdf.getPage(page_num))

    # create intermediary output stream
    page1stream = BytesIO()                                        # empty stream object that pyPDF2 class method can write to
    logging.info(f"page1stream: {type(page1stream)}")

    pdfWriter.write(page1stream)

    logging.info(f"page1stream.getvalue: {type(page1stream.getvalue())}")    # returns bytes

    page1.set(page1stream.getvalue())                         # set the func.Out[func.InputStream] object to stream 
                                                              # out the bytes to blob storage
    page1stream.close()

comments welcome if this can be optimised or improved!

jbatte47 commented 2 years ago

Piggybacking on this issue to mention that I'm getting a different error while trying to read a CSV using pandas 1.4.2:

import pandas as pd
import azure.functions as func

def main(file: func.InputStream):
  df = pd.read_csv(file)
  # use DataFrame...

☝️ code like this produces this error: Exception: UnsupportedOperation: read1

Not sure if the different file type makes this a totally different issue, or if they're linked. A fix for this seems like something that could potentially help functions working on very large input files to keep their memory footprint smaller. I am able to pass BytesIO(file.read()) in and continue working, but the resulting memory profile after calling file.read is pretty large on sufficiently large blobs.