Open leendertvanwolfswinkel opened 2 months ago
I already experimented a bit with this and wrote this wrapper around geodiff
that outputs the changes as vector layers:
import csv
import pathlib
import tempfile
from typing import List, Dict
import pygeodiff
import base64
import json
from osgeo import ogr, osr
from shapely import wkb
ogr.UseExceptions()
WKT_OGR_GEOMETRY_TYPES = {
"GEOMETRY": ogr.wkbUnknown,
"NONE": ogr.wkbNone,
"POINT": ogr.wkbPoint,
"LINESTRING": ogr.wkbLineString,
"POLYGON": ogr.wkbPolygon,
"MULTIPOINT": ogr.wkbMultiPoint,
"MULTILINESTRING": ogr.wkbMultiLineString,
"MULTIPOLYGON": ogr.wkbMultiPolygon,
}
OGR_FIELD_TYPES = {
"text": ogr.OFTString,
"integer": ogr.OFTInteger,
"double": ogr.OFTReal,
}
class Schema:
def __init__(self, geodiff:pygeodiff.GeoDiff, geopackage_path: str | pathlib.Path):
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_schema_json_path = temp_file.name
geodiff.schema(driver="sqlite", driver_info="", src=str(geopackage_path), json=temp_schema_json_path)
with open(temp_schema_json_path, 'r') as schema_json:
self._dict = json.load(schema_json)
def _schema_for_table(self, table: str) -> Dict:
for table_schema in self._dict["geodiff_schema"]:
if table_schema["table"] == table:
return table_schema
def columns(self, table):
table_schema = self._schema_for_table(table)
return table_schema["columns"]
def column_names(self, table: str, exclude_geometry: bool = False) -> List[str]:
table_schema = self._schema_for_table(table)
if exclude_geometry:
return [
column_schema["name"] for column_schema in table_schema["columns"]
if column_schema.get("geometry") is None
]
else:
return [column_schema["name"] for column_schema in table_schema["columns"]]
def geometry_type(self, table: str) -> Dict | None:
"""
Returns None if table has no geometry
"""
table_schema = self._schema_for_table(table)
geometry_schema = None # in case table has no columns and no geometry (is this possible?)
for column_schema in table_schema.get("columns"):
geometry_schema = column_schema.get("geometry")
if geometry_schema:
break
if geometry_schema is None:
return None
else:
return geometry_schema.get("type")
def crs(self, table: str) -> Dict:
"""
Returns None if table has no geometry_type
"""
table_schema = self._schema_for_table(table)
return table_schema.get('crs')
def create_layer(
self,
table: str,
datasource: ogr.DataSource,
layer_name: str,
fields: List[ogr.FieldDefn] = None
) -> ogr.Layer:
wkt_geometry_type = self.geometry_type(table) or "NONE"
ogr_geometry_type = WKT_OGR_GEOMETRY_TYPES[wkt_geometry_type]
if wkt_geometry_type != "NONE":
crs_schema = self.crs(table)
srs = osr.SpatialReference()
srs.ImportFromWkt(crs_schema.get("wkt"))
else:
srs = None
layer = datasource.CreateLayer(layer_name, srs, ogr_geometry_type) # Geometry type set to unknown
if layer is None:
raise RuntimeError(f"Failed to create layer '{table}' in the GeoPackage.")
if fields:
for field_defn in fields:
layer.CreateField(field_defn)
else:
for column_schema in self.columns(table):
if column_schema["type"] != "geometry":
ogr_field_type = OGR_FIELD_TYPES[column_schema["type"]]
layer.CreateField(ogr.FieldDefn(column_schema["name"], ogr_field_type))
return layer
def get_geom(
datasource_path: str | pathlib.Path,
layer_name: str,
feature_id: int,
format: str,
) -> str:
"""
Get the WKT (Well-Known Text) geometry from a specified feature in a layer of the given OGR datasource.
Args:
datasource (ogr.DataSource): The OGR data source (e.g., GeoPackage or Shapefile).
layer_name (str): The name of the layer from which to get the feature.
feature_id (int): The ID of the feature whose geometry should be retrieved.
Returns:
str: The WKT representation of the geometry.
"""
datasource = ogr.Open(datasource_path)
# Get the layer by name from the datasource
layer = datasource.GetLayerByName(layer_name)
if layer is None:
raise ValueError(f"Layer '{layer_name}' not found in the datasource.")
# Get the feature from the layer by its feature ID
feature = layer.GetFeature(feature_id)
if feature is None:
raise ValueError(f"Feature with ID '{feature_id}' not found in layer '{layer_name}'.")
# Get the geometry reference from the feature
geometry = feature.GetGeometryRef()
if geometry is None:
raise ValueError(f"Feature with ID '{feature_id}' does not have a valid geometry.")
if format == "wkt":
return geometry.ExportToWkt()
elif format == "wkb":
return geometry.ExportToWkb()
else:
raise ValueError("format must be one of 'wkt', 'wkb'")
class GeoDiffer:
"""Wrapper around pygeodiff.GeoDiff. Adds some utilities like exporting changesets to GIS formats"""
def __init__(
self,
geodiff: pygeodiff.GeoDiff,
base: str | pathlib.Path,
modified: str | pathlib.Path
):
"""
:param geodiff: geodiff for which to parse the changes
:param base: Path to base geopackage
:param modified: Path to modified geopackage
"""
self.geodiff = geodiff
self.updates = dict()
self.deletes = dict()
self.inserts = dict()
self.base = base
self.base_schema = Schema(geodiff=self.geodiff, geopackage_path=base)
self.modified = modified
self.modified_schema = Schema(geodiff=self.geodiff, geopackage_path=modified)
self._changeset_path = None
self.geodiff_dict = None
@property
def changeset_path(self):
"""
Path to the changeset (.diff) file for the GeoDiff.
Returns None if create_changeset() has not been run yet
"""
return self._changeset_path
def create_changeset(self):
"""Sets self._changeset_path"""
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
self._changeset_path = temp_file.name
self.geodiff.create_changeset(
base=str(self.base),
modified=str(self.modified),
changeset=self._changeset_path
)
def list_changes(self):
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_json_path = temp_file.name
self.geodiff.list_changes(changeset=str(self._changeset_path), json=temp_json_path)
with open(temp_json_path, 'r') as json_file:
self.geodiff_dict = json.load(json_file)
def export_geopackage_geometry(self, geom: str, export_format: str):
"""
Export a geopackage geometry to WKT or WKB
:param geom: string representation of a base64-encoded wkb geometry with geopackage headers
:param export_format: 'wkt' or 'wkb'
"""
binary_geometry = base64.b64decode(geom)
wkb_geom = self.geodiff.create_wkb_from_gpkg_header(binary_geometry)
if export_format == 'wkb':
return wkb_geom
elif export_format == 'wkt':
shapely_geom = wkb.loads(wkb_geom)
return shapely_geom.wkt
else:
raise ValueError("export_format must be one of 'wkt', 'wkb'")
def parse(self):
for change_set in self.geodiff_dict["geodiff"]:
if change_set["type"] == "update":
self.parse_updated(change_set)
elif change_set["type"] == "delete":
self.parse_deleted(change_set)
elif change_set["type"] == "insert":
self.parse_inserted(change_set)
def parse_updated(self, change_set: Dict):
"""
Create a {table_name: [changes]} dict.
Each change in [changes] is a dict with:
- 'primary_key': <primary key>
- 'geometry': <geometry (before update)>
- 'changes': a newline-separated list of "<column>: <old value> -> <new value>" strings
"""
table = change_set["table"]
if self.updates.get(table) is None:
self.updates[table] = []
primary_key = change_set["changes"][0]["old"]
geom = get_geom(
datasource_path=self.base,
layer_name=table,
feature_id=primary_key,
format="wkb"
) if self.base_schema.geometry_type(table) else None
changes = []
columns = self.base_schema.columns(table)
for change in change_set["changes"][1:]:
column_schema = columns[change['column']]
changes.append(f"{column_schema['name']}: {change['old']} -> {change['new']}")
changes_str = "\n".join(changes)
self.updates[table].append(
{
"primary_key": primary_key,
"geometry": geom,
"changes": changes_str
}
)
def parse_deleted(self, change_set: Dict):
"""
Adds a {"geometry": <geometry>, "values": [<values>]} dict to the self.deletes[table] list
"""
table = change_set["table"]
if self.deletes.get(table) is None:
self.deletes[table] = []
values = []
geometry = None
columns = self.base_schema.columns(table)
for change in change_set["changes"]:
column_schema = columns[change['column']]
if column_schema["type"] == "geometry":
geometry = self.export_geopackage_geometry(change["old"], "wkb")
else:
values.append(change["old"])
self.deletes[table].append({"values": values, "geometry": geometry})
def parse_inserted(self, change_set: Dict):
"""
Adds a {"geometry": <geometry>, "values": [<values>]} dict to the self.inserts[table] list
"""
table = change_set["table"]
if self.inserts.get(table) is None:
self.inserts[table] = []
values = []
geometry = None
columns = self.modified_schema.columns(table)
for change in change_set["changes"]:
column_schema = columns[change['column']]
if column_schema["type"] == "geometry":
geometry = self.export_geopackage_geometry(change["new"], "wkb")
else:
values.append(change["new"])
self.inserts[table].append({"values": values, "geometry": geometry})
def to_ogr(self, path: pathlib.Path, driver_name: str):
"""
Write differences to a gis vector file or directory of files
:param path: interpreted as a directory if driver does not support multiple layers
:param driver_name: a driver short name (see https://gdal.org/en/latest/drivers/vector/index.html)
"""
driver = ogr.GetDriverByName(driver_name)
if driver is None:
raise ValueError(f"Driver '{driver_name}' not found.")
else:
multi_layer_support = driver.GetMetadataItem("DCAP_MULTIPLE_VECTOR_LAYERS") == 'YES'
if multi_layer_support:
if path.exists():
datasource = driver.Open(str(path), 1) # Open in update mode
else:
datasource = driver.CreateDataSource(str(path))
if datasource is None:
raise RuntimeError(f"Could not create GeoPackage at {path}")
else:
path.mkdir(parents=True, exist_ok=True)
# updated
for table, rows in self.updates.items():
print(f"{table} has {len(rows)} updated rows")
layer_name = table + "_updated"
if not multi_layer_support:
file_extension = driver.GetMetadataItem("DMD_EXTENSIONS").split(" ")[0]
datasource = driver.CreateDataSource((str(path / layer_name) + f".{file_extension}"))
fields = [
ogr.FieldDefn("primary_key", ogr.OFTInteger),
ogr.FieldDefn("changes", ogr.OFTString)
]
layer = self.base_schema.create_layer(
table=table,
datasource=datasource,
layer_name=layer_name,
fields=fields
)
layer_defn = layer.GetLayerDefn()
for row in rows:
feature = ogr.Feature(layer_defn)
geom_wkb = row.get("geometry")
if geom_wkb:
geom = ogr.CreateGeometryFromWkb(geom_wkb)
feature.SetGeometry(geom)
feature.SetField("primary_key", row.get("primary_key"))
feature.SetField("changes", row.get("changes"))
layer.CreateFeature(feature)
# Destroy the feature to free resources
feature = None
# deleted
for table, rows in self.deletes.items():
print(f"{table} has {len(rows)} deleted rows")
layer_name = table + "_deleted"
if not multi_layer_support:
file_extension = driver.GetMetadataItem("DMD_EXTENSIONS").split(" ")[0]
datasource = driver.CreateDataSource((str(path / layer_name) + f".{file_extension}"))
layer = self.base_schema.create_layer(
table=table,
datasource=datasource,
layer_name=layer_name
)
layer_defn = layer.GetLayerDefn()
column_names = self.base_schema.column_names(table, exclude_geometry=True)
for row in rows:
feature = ogr.Feature(layer_defn)
geom_wkb = row.get("geometry")
if geom_wkb:
geom = ogr.CreateGeometryFromWkb(geom_wkb)
feature.SetGeometry(geom)
for i, value in enumerate(row["values"]):
feature.SetField(column_names[i], value)
layer.CreateFeature(feature)
# Destroy the feature to free resources
feature = None
# inserted
for table, rows in self.inserts.items():
print(f"{table} has {len(rows)} inserted rows")
layer_name = table + "_inserted"
if not multi_layer_support:
file_extension = driver.GetMetadataItem("DMD_EXTENSIONS").split(" ")[0]
datasource = driver.CreateDataSource((str(path / layer_name) + f".{file_extension}"))
layer = self.base_schema.create_layer(
table=table,
datasource=datasource,
layer_name=layer_name
)
layer_defn = layer.GetLayerDefn()
column_names = self.modified_schema.column_names(table, exclude_geometry=True)
for row in rows:
feature = ogr.Feature(layer_defn)
geom_wkb = row.get("geometry")
if geom_wkb:
geom = ogr.CreateGeometryFromWkb(geom_wkb)
feature.SetGeometry(geom)
for i, value in enumerate(row["values"]):
feature.SetField(column_names[i], value)
layer.CreateFeature(feature)
# Destroy the feature to free resources
feature = None
if __name__ == "__main__":
DATA_DIR = pathlib.Path("diff_data")
# Define scenario's
original = DATA_DIR / "pipes_ori.gpkg"
edit = DATA_DIR / "pipes_edit.gpkg"
my_geodiffer = GeoDiffer(pygeodiff.GeoDiff(), base=original, modified=edit)
my_geodiffer.create_changeset()
my_geodiffer.list_changes()
my_geodiffer.parse()
# my_geodiffer.to_ogr(DATA_DIR / "test.gpkg", driver_name="GPKG")
my_geodiffer.to_ogr(DATA_DIR / "geojson_output", driver_name="GeoJSON")