Esri / arcgis-python-api

Documentation and samples for ArcGIS API for Python
https://developers.arcgis.com/python/
Apache License 2.0
1.87k stars 1.1k forks source link

Bug in Lib\site-packages\arcgis\_impl\common\_query.py #1895

Open hildermesmedeiros opened 1 month ago

hildermesmedeiros commented 1 month ago

Describe the bug A clear and concise description of what the bug is.

To Reproduce Steps to reproduce the behavior:

# FeatureLayer with many data points and usually an unstable connection.
import pandas as pd
from arcgis.gis import GIS
from arcgis.features import FeatureLayer
import json
from string import Template
gis = GIS('pro')
print(gis.users.me)

fl = FeatureLayer('url/FeatureServer/11')
df: pd.DataFrame = fl.query().sdf

error:

# No error, but there are more rows than actually exist in the data.

Screenshots image

Expected behavior Check for duplicated data before inserting.

Platform (please complete the following information):

Additional context Add any other context about the problem here, attachments etc.

nanaeaubry commented 1 month ago

@hildermesmedeiros Can you provide data where this is happening because we are not able to reproduce with any of our layers.

image

image

hildermesmedeiros commented 1 month ago

@nanaeaubry This error occurs due to server or connection instability. It won't be possible to reproduce it unless you simulate an unstable connection or mock the code in this file to simulate an error.

However, just by reading the code, it seems clear to me that the error (duplicate data that doesn't exist in the service) is coming from there.

I will try to create an example tomorrow....

I think creating a feature layer with 10k rows and randomly generating errors in the while loop might be enough to prove this.

hildermesmedeiros commented 2 weeks ago

@nanaeaubry I could not reproduce the duplicated data error. But something seems off

import pandas as pd
import numpy as np
import random
from faker import Faker
import wrapt
from functools import partial
import time
from arcgis.gis import GIS
from arcgis.features import FeatureLayer
from dask import delayed, compute
from dask.diagnostics import ProgressBar
from requests.adapters import HTTPAdapter
from requests.models import Response

gis = GIS('pro')

# Initialize Faker and seed for reproducibility install faker first
print('creating fake data')
fake = Faker()
np.random.seed(42)
Faker.seed(42)

num_rows = 100000

data = {
    'ct_id': np.random.randint(1, 10000, num_rows),
    'ct_name': [fake.city() for _ in range(num_rows)],
    'ct_state': [fake.state_abbr() for _ in range(num_rows)],
    'ct_country': ['USA'] * num_rows,
    'data_date': pd.date_range(start='2022-01-01', periods=num_rows, freq='H'),
    'data_date_br': pd.date_range(start='2022-01-01', periods=num_rows, freq='H').strftime('%d/%m/%Y %H:%M:%S'),
    'data_humidity': np.random.uniform(10, 100, num_rows),
    'data_pressure': np.random.uniform(950, 1050, num_rows),
    'data_rain_precipitation': np.random.uniform(0, 50, num_rows),
    'data_wind_velocity': np.random.uniform(0, 40, num_rows),
    'data_wind_direction': [random.choice(['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW']) for _ in range(num_rows)],
    'data_wind_directiondegrees': np.random.uniform(0, 360, num_rows),
    'data_wind_gust': np.random.uniform(0, 50, num_rows),
    'data_temperature': np.random.uniform(-10, 40, num_rows),
}

# Generate the ct_key by combining longid (ct_id) and datetime in milliseconds
data['ct_key'] = [
    f"{data['ct_id'][i]}{int(time.mktime(data['data_date'][i].timetuple()) * 1000)}"
    for i in range(num_rows)
]

# Create the DataFrame
df = pd.DataFrame(data)

# Set appropriate data types
df['ct_id'] = df['ct_id'].astype('Int32')
df['ct_name'] = df['ct_name'].astype('string')
df['ct_state'] = df['ct_state'].astype('string')
df['ct_country'] = df['ct_country'].astype('string')
df['data_date'] = pd.to_datetime(df['data_date'])
df['data_date_br'] = df['data_date_br'].astype('string')
df['data_humidity'] = df['data_humidity'].astype('Float64')
df['data_pressure'] = df['data_pressure'].astype('Float64')
df['data_rain_precipitation'] = df['data_rain_precipitation'].astype('Float64')
df['data_wind_velocity'] = df['data_wind_velocity'].astype('Float64')
df['data_wind_direction'] = df['data_wind_direction'].astype('string')
df['data_wind_directiondegrees'] = df['data_wind_directiondegrees'].astype('Float64')
df['data_wind_gust'] = df['data_wind_gust'].astype('Float64')
df['data_temperature'] = df['data_temperature'].astype('Float64')
df['ct_key'] = df['ct_key'].astype('string')

print('publishing fake content')
fc = gis.content.import_table(df,service_name='testrow', title='test row')
table_url = fc.tables[0].url
# table_url = 'https://services.arcgis.com/qFQYQQeTXZSPY7Fs/arcgis/rest/services/testrow/FeatureServer/0'
fl = FeatureLayer(table_url, gis=gis)

print('-------starting-----')

df1 = fl.query().sdf
print(f'should be = {num_rows}')
print(f"df1: {df1.ObjectId.count()}") #missing data

##### try to get with dask in batchs (batchs of 1000)
class DaskFetcherError(Exception):
    def __init__(self, message):
        super().__init__(message)
        self.message = message

def retry_decorator(wrapped=None, *, retry_num: int = 4, retry_sleep_sec: float = 10, backoff=True, backoff_factor: float=1.):
    """
    Decorador de retry.
    :param retry_num: número inteiro de tentativas
    :param retry_sleep_sec: quantidade de segundos para esperar entre tentativas
    :param backoff: True para ligar espera exponencial retry_sleep_sec * backoff_factor * (2 ** (attempt - 1))
    :param backoff_factor:
    :return:
    """
    if wrapped is None:
        return partial(retry_decorator, retry_num=retry_num, retry_sleep_sec=retry_sleep_sec,
                       backoff=backoff, backoff_factor=backoff_factor)

    @wrapt.decorator
    def wrapper(wrapped, instance, args, kwargs):
        try_error = False
        for attempt in range(1, retry_num + 1, 1):
            try:
                return wrapped(*args, **kwargs)
            except Exception as err:
                if not try_error:
                    # arcpy.AddMessage(f'def {wrapped.__name__} falhou')
                    print(f'def {wrapped.__name__} falhou')
                    try_error = True
                assert retry_num >= 1
                assert retry_sleep_sec > 0.
                assert backoff_factor >= 1.
                if backoff:
                    total_sleep_sec = retry_sleep_sec * backoff_factor * (2 ** (attempt - 1))
                else:
                    total_sleep_sec = retry_sleep_sec
                # arcpy.AddMessage(f'Retrying def {wrapped.__name__}, attempt {attempt}, sleeping {total_sleep_sec:.2f}s')
                print(f'Retrying, attempt {attempt}, sleeping {total_sleep_sec:.2f} s)')
                time.sleep(total_sleep_sec)
        return wrapped(*args, **kwargs)
    return wrapper(wrapped)

class FeatureLayerDataFetcher:
    def __init__(self, gis, url):
        """
        Initialize the FeatureLayerDataFetcher with an existing GIS object and the URL of the feature layer.

        :param gis: An authenticated arcgis.gis.GIS object.
        :param url: The URL of the feature layer.
        """
        self.gis = gis
        self.url = url
        self.loading_msg = 'Fetching Layers in parallel with Dask: '

    def get_total_rows(self):
        """
        Get the total number of rows in the feature layer.

        :return: Total number of rows.
        """
        feature_layer = FeatureLayer(self.url, gis=self.gis)
        query_result = feature_layer.query(where="1=1", return_count_only=True)
        return query_result

    def fetch_data_in_batches(self, batch_size=1000):
        """
        Fetch data from the feature layer in batches of a specified size using Dask for parallelization.

        :param batch_size: The number of rows to fetch in each batch. Default is 2000.
        :return: List of DataFrames containing all fetched data.
        """
        # Get the total number of rows
        total_rows = self.get_total_rows()
        offsets = range(0, total_rows+1, batch_size)
        # Create a list of delayed objects for each batch
        tasks = [delayed(self.get_data_batch)(self, offset, batch_size) for offset in offsets]

        try:
            print(self.loading_msg)
            with ProgressBar():
                results = compute(*tasks, scheduler='threads', optimize_graph=False)
        except Exception as e:
            print(e)
            raise DaskFetcherError('Error fetching data in batches')

        return results

    @delayed
    @retry_decorator
    def get_data_batch(self, offset, batch_size):
        """
        Fetch a batch of data from the feature layer.

        :param offset: The starting offset for the batch.
        :param batch_size: The number of rows to fetch in this batch.
        :return: Pandas DataFrame of the fetched data.
        """
        try:
            fl = FeatureLayer(self.url, gis=self.gis)
            query_result = fl.query(where="1=1",
                                    out_fields="*",
                                    return_all_records=False,
                                    result_offset=offset,
                                    result_record_count=batch_size,
                                    return_exceeded_limit_features=False)
            df = query_result.sdf
            if not df.empty:
                # Rename columns based on aliases
                columns_alias = {f.name: f.alias for f in fl.properties.fields}
                df = df.rename(columns=columns_alias)
                return df

            return pd.DataFrame([])

        except Exception as e:
            print(e)
            raise DaskFetcherError(f'Error fetching batch at offset {offset}')

fetcher = FeatureLayerDataFetcher(gis, table_url)
total_rows = fetcher.get_total_rows()
print(f'total_rows: {total_rows}')
try:
    data_batches = fetcher.fetch_data_in_batches()

    # Concatenate all DataFrames into one if needed
    df3 = pd.concat(data_batches, ignore_index=True)
    print(f'should be = {num_rows}')
    print(f"df3: {df3.ObjectId.count()}")
except DaskFetcherError as e:
    print(e.message)

class NExceptionTimeoutHTTPAdapter(HTTPAdapter):
    def __init__(self, fail_probability=0.1, *args, **kwargs):
        """
        HTTP Adapter that randomly fails with a 504 Gateway Timeout error
        with a specified probability.

        :param fail_probability: Probability of simulating a 504 error (0 <= fail_probability <= 1).
        """
        super().__init__(*args, **kwargs)
        self.fail_probability = fail_probability

    def send(self, request, *args, **kwargs):
        # Simulate failure with a certain probability
        if random.random() < self.fail_probability:
            response = Response()
            response.status_code = 504
            response._content = b'''
            {
                "success": false,
                "error": {
                    "code": 504,
                    "message": "HTTP Error 504: GATEWAY_TIMEOUT",
                    "details": ["The service was not able to fulfill the request, possibly due to invalid input, or the service may not be functioning properly."]
                }
            }
            '''
            response.url = request.url
            return response

        # Proceed with the actual request if no failure is simulated
        response = super().send(request, *args, **kwargs)
        return response

print('testing unstable server')

session = gis._con._session
adapter = NExceptionTimeoutHTTPAdapter()
session.mount('http://', adapter)
session.mount('https://', adapter)
df2 = fl.query().sdf

print(f'should be = {num_rows}')
print(f"df2: {df2.ObjectId.count()}")

if __name__ == '__main__':
    print('end')

outputed

creating fake data
publishing fake content
-------starting-----
should be = 100000
df1: 99000
total_rows: 100000
Fetching Layers in parallel with Dask: 
[####################################### ] | 99% Completed | 12.22 sAn error occurred.
(Error Code: 503)
def get_data_batch falhou
Retrying, attempt 1, sleeping 10.00 s)
[########################################] | 100% Completed | 24.92 s
should be = 100000
df3: 100000
testing unstable server
should be = 100000
df2: 2000
end

I would look to the _query.py with some extra care.

ps> parallel processing is disabled because I would have to do more code, you guys use lazy loading. The code I used as template won't work with lazy loading.

But one can get the idea. The query is not working properly.....or so it seems, I might be doing something wrong.

I just want to do fl.query().sdf and get all data....