josephramsay / lds_replicate

Replication scripts for LDS (LINZ Data Service).
http://data.linz.govt.nz/
2 stars 1 forks source link

Implement seamless partitioning of large datasets #4

Closed josephramsay closed 11 years ago

josephramsay commented 11 years ago

Partition large datasets into more easily processed groups to avoid timeouts.

Partition v:x772 NZ Primary Parcels into manageable subsets using higher level user strategies cql_filter=bbox/lower<id<upper -g dynamic_layer_group1

josephramsay commented 11 years ago

When using partitions we cannot use the driver copy method since this follows a delete/replace methodology. For this reason partitioned downloads must use feature-by-feature copy (which is going to be very slow for big layers)

method: does layer needs to be partitioned? P=0 while data remains: http://wfs...&cql_filter=id between P and P+INCR; P=P+INCR else:

josephramsay commented 11 years ago

Selection ranges using "id between {partition_start} and {partition_start+partition_size}" fail if id ranges are clustered into discontinuous blocks. Large empty blocks make the code think processing is finished. Increasing block size will crash WFS

New alternative using; maxfeatures={partition_size}&sortBy={primary_key}&id>{last_index} avoids the above problem

josephramsay commented 11 years ago

ds.CopyLayer can use OGR_WFS_PAGING_ALLOWED and OGR_WFS_PAGE_SIZE to segment large downloads but since this doesn't work on incremental where we use layer.CreateFeature(new_feat) the cql_filter method has to be retained.

palmerj commented 11 years ago

I don't think you should be using copyLayer because of the need to replicate a selected set of user defined columns. However I'm pretty sure it should work anyway. Do you have the correct config options set.

See this python example which definitely use paging when a is filter set:

import ogr
import gdal

wfs_drv = ogr.GetDriverByName('WFS')

gdal.SetConfigOption('CPL_DEBUG', 'ON')
gdal.SetConfigOption('OGR_WFS_LOAD_MULTIPLE_LAYER_DEFN', 'NO')
gdal.SetConfigOption('OGR_WFS_PAGING_ALLOWED', 'YES')
gdal.SetConfigOption('OGR_WFS_PAGE_SIZE', '10000')

ds = ogr.Open('WFS:http://wfs.data.linz.govt.nz/API-KEY/wfs')

all_parcels = ds.GetLayerByName("v:x1571")
all_parcels.SetAttributeFilter("status='Approved as to Survey'")

feature_count = 0
feat = all_parcels.GetNextFeature()
while feat is not None:
    feat = all_parcels.GetNextFeature()
    feature_count += 1
feat = None

print "Number of features returned was: " + str(feature_count)

exit(0)

Because I have debug see you can set the URL being sent to the server, and it should contain the startindex and maxfeatures parameters.

palmerj commented 11 years ago

Just tested ds.CopyLayer method and that works as well.

import ogr
import gdal

wfs_drv = ogr.GetDriverByName('WFS')

gdal.SetConfigOption('CPL_DEBUG', 'ON')
gdal.SetConfigOption('OGR_WFS_LOAD_MULTIPLE_LAYER_DEFN', 'NO')
gdal.SetConfigOption('OGR_WFS_PAGING_ALLOWED', 'YES')
gdal.SetConfigOption('OGR_WFS_PAGE_SIZE', '10000')

wfs_ds = ogr.Open('WFS:http://wfs.data.linz.govt.nz/API-KEY/wfs')

all_parcels = wfs_ds.GetLayerByName("v:x1571")
all_parcels.SetAttributeFilter("status='Approved as to Survey'")
mem_ds = ogr.GetDriverByName('Memory').CreateDataSource('')
all_parcels_copy = mem_ds.CopyLayer(all_parcels, 'all_parcel_copy')
josephramsay commented 11 years ago

Thanks, maybe my problem description is unclear... CopyLayer is used when cloning a layer (non incremental) and unwanted columns are deleted afterward. This is for speed and there isn't a problem with this... The difficulty comes from incremental where we get insert/delete instructions from the wfs document and must process each feature individually and strip fields at the feature level to copy into an existing destination layer. That's when the filter method is useful.

palmerj commented 11 years ago

Quite a bad idea to delete columns after a table is created. In terms of MSSQL and PostgreSQL databases, the on-disk representation will almost double unless costly operations (VACUUM FULL in the case of PostgreSQL) are used to re-compact and reclaim dead row disk space.

I would move to a feature by feature processing loop full stop. Primarily due to re-projection and column subsets. It may be a slower, but it will simply the code a lot.


From: josephramsay [mailto:notifications@github.com] Sent: Friday, 11 January 2013 9:11 a.m. To: josephramsay/LDS Cc: Jeremy Palmer Subject: Re: [LDS] Implement seamless partitioning of large datasets (#4)

Thanks, maybe my problem description is unclear... CopyLayer is used when cloning a layer (non incremental) and unwanted columns are deleted afterward. This is for speed and there isn't a problem with this... The difficulty comes from incremental where we get insert/delete instructions from the wfs document and must process each feature individually and strip fields at the feature level to copy into an existing destination layer. That's when the filter method is useful.

Reply to this email directly or view it on GitHubhttps://github.com/josephramsay/LDS/issues/4#issuecomment-12116173.


This message contains information, which is confidential and may be subject to legal privilege. If you are not the intended recipient, you must not peruse, use, disseminate, distribute or copy this message. If you have received this message in error, please notify us immediately (Phone 0800 665 463 or info@linz.govt.nz) and destroy the original message. LINZ accepts no responsibility for changes to this email, or for any attachments, after its transmission from LINZ. Thank You.

palmerj commented 11 years ago

Just checked the performance of just copying the features using a GetNextFeature/CreateFeature loop and it's performance seems better (x2) than CopyLayer when going to PostgreSQL. Are your sure you are using transactions and the GDAL PG_USE_COPY configuration option (see my examples below).

postgis_copy_layer.py:

import ogr
import gdal

try:
    progress = gdal.TermProgress_nocb
except:
    progress = gdal.TermProgress

gdal.SetConfigOption('CPL_DEBUG', 'OFF')
gdal.SetConfigOption('PG_USE_COPY', 'YES')

fgdb_ds = ogr.Open('/home/jpalmer/Data/LDS/FGDB/nz_parcels.gdb')
all_parcels = fgdb_ds.GetLayerByName("nz_parcels")
layer_count = all_parcels.GetFeatureCount(force=0)

pg_ds = ogr.Open( 'PG:dbname=bde_db', update = 0 )
all_parcels_copy = pg_ds.CopyLayer(all_parcels, 'all_parcel_copyLayer', [ 'OVERWRITE=YES', 'GEOMETRY_NAME=shape', 'SCHEMA=public' ] )

time python postgis_copy_layer.py

real 7m37.123s user 4m25.481s sys 0m9.149s

postgis_copy_feature.py:

import ogr
import gdal

try:
    progress = gdal.TermProgress_nocb
except:
    progress = gdal.TermProgress

gdal.SetConfigOption('CPL_DEBUG', 'OFF')
gdal.SetConfigOption('PG_USE_COPY', 'YES')

fgdb_ds = ogr.Open('/home/jpalmer/Data/LDS/FGDB/nz_parcels.gdb')
all_parcels = fgdb_ds.GetLayerByName("nz_parcels")
layer_count = all_parcels.GetFeatureCount(force=0)

pg_ds = ogr.Open( 'PG:dbname=bde_db', update = 0 )

all_parcels_copy_feat = pg_ds.CreateLayer( 'all_parcel_copy_feature', srs = all_parcels.GetSpatialRef(), geom_type = all_parcels.GetGeomType(), options = [ 'OVERWRITE=YES', 'GEOMETRY_NAME=shape', 'SCHEMA=public' ]);
lyr_defn = all_parcels.GetLayerDefn()
for i in range( lyr_defn.GetFieldCount() ):
    field = lyr_defn.GetFieldDefn( i )
    new = ogr.FieldDefn( field.GetNameRef(), field.GetType() )
    new.SetWidth( field.GetWidth() )
    new.SetPrecision( field.GetPrecision() )
    all_parcels_copy_feat.CreateField( new )

count = 0.0
progress( count )

all_parcels.ResetReading()
feat = all_parcels.GetNextFeature()
all_parcels_copy_feat.StartTransaction()
while feat is not None:
    all_parcels_copy_feat.CreateFeature(feat)
    feat.Destroy()
    count += 1
    feat = all_parcels.GetNextFeature()
    progress( count / float(layer_count)  )
feat = None
all_parcels_copy_feat.CommitTransaction()

exit(0)

time python postgis_copy_feature.py 0...10...20...30...40...50...60...70...80...90...100 - done.

real 3m51.078s user 3m10.736s sys 0m5.928s

josephramsay commented 11 years ago

Driver and feature copy were temporarily replaced with feature+feature-incremental copy to gauge performance. There don't seem to be any major speed issues and the previous RAM problems seem to be resolved. Will remove refs to the drivercopy code