osmcode / pyosmium

Python bindings for libosmium
https://osmcode.org/pyosmium
BSD 2-Clause "Simplified" License
318 stars 65 forks source link

Help needed: Node callback keeps reference to OSM object #187

Closed fsalerno-sdc closed 2 years ago

fsalerno-sdc commented 3 years ago

Hi. I have a rather big script that runs on planet dumps, which at some point (hours in) gives a "Node callback keeps reference to OSM object. This is not allowed." It doesn't seem to be consistent (I ran it on smaller dumps and sometimes it crashed, sometimes it ran to completion with no hiccup), but on something as big as planet.osm.pbf it seems to be pretty much guaranteed to happen before it finishes.

The issue is that the error message is extremely vague, and I can't find it in the source code of either pyosmium or osmium, so I have no idea what is causing it exactly.

Please help me identify the source of the problem, and if possible I think the exception message should be improved to be more precise.

I have a handler class that, among other things, handles Nodes; things that don't touch OSM objects have been redacted for brevity.

In order to save all the data I need as the object gets passed around, if it fits my criteria I turn it into a custom class that is mutable and takes just the attributes I need:

class OSMObjectMimic:
    """
    Mutable object with some of the attributes of an OSMObject, buildable from
    an existing instance.
    Turns `.tags` into a dictionary of strings.
    The presence of attributes that are specific to a subclass of OSMObject is
    conditional and should be checked with `hasattr()`.
    Allows marking objects for skipping by setting the `skip` attribute to `True`.
    """

    __slots__ = ["skip", "positive_id", "id", "tags", "location", "members"]

    def __init__(self, source: Optional[OSMObject] = None):
        self.skip = False
        """Whether to skip saving this element."""

        self.id = source and source.id
        self.positive_id = source and source.positive_id()

        self.tags = {}
        if source:
            for tag in source.tags:
                self.tags[tag.k] = tag.v

        if hasattr(source, "location"):
            self.location: Location = source.location

        if hasattr(source, "members"):
            self.members: RelationMemberList = source.members
class JointHandler(osmium.SimpleHandler):
    def __init__(*args, **kwargs):
        # Holds duplicates by file name; the set of int are IDs taken with OSMObject.positive_id
        self.duplicates: Dict[str, Set[int]] = defaultdict(set)
        self.check_duplicates()  # <- contents redacted, it just reads from a file

    def node(self, node: Node):
        tags = node.tags

        if tags.get("place") == "country":
            self.save_country(OSMObjectMimic(node))
            return

        if tags.get("place") in self.accepted_places and any(
            k.startswith("name") for k, _ in tags
        ):
            self.save_place(OSMObjectMimic(node))
            return

    def save_place(self, element: OSMObjectMimic):
        ... # Check for duplicates etc

        tags = element.tags

        country_id = None
        admin_id = None

        # Get location if possible
        longitude = ""
        latitude = ""
        if hasattr(element, "location"):
            location: Location = element.location
            if location.valid():
                longitude = str(location.lon)
                latitude = str(location.lat)

                # This calls a local instance of Photon to reverse-geocode for some info
                # I don't think it's relevant but I can show it if you think it helps
                geodata = get_geodata_from_photon(latitude, longitude)

                ... # Get stuff from geodata

        tags["admin_id"] = admin_id or ""
        tags["country_id"] = country_id or ""
        if not country_id:
            element.skip = True

        tags["longitude"] = longitude
        tags["latitude"] = latitude
        tags["capital"] = int(tags.get("capital", "") in ("yes", "2"))

        self.save_element(element, self.places_file_name)

    def save_country(self, element: OSMObjectMimic):
        ... # Very similar to save_place, it too calls save_element at the end

    def save_element(self, element: OSMObjectMimic, destination: str):
        if element.skip:
            return

        tags = element.tags

        data = []
        ... # Populate data with stuff from tags

        if not any(data):
            return

        # This is an open file pointer; the class acts like a context manager which at the end closes all files
        self.files[destination].write("\t".join(data) + "\n")

I'm not sure which part is holding a reference to an OSM object. I don't know how the check is made, nor why it only seems to error out after quite a while. I don't think I'm keeping any reference explicitly, since I refer to objects by ID and even copy stuff to a different object I own. Maybe the RelationMemberList I keep? Should I copy that more explicitly or something?

lonvia commented 3 years ago

The relation member list needs to be deep-copied. the deep copy rules are the same as for any other Python object: all non-trivial types need deep copies. You can also have a look at these tests as a reference for which members of the OSM object cause the error.

fsalerno-sdc commented 3 years ago

I had that feeling and tried copying the data I need using the following classes:

Disregard this code, please read the next comment.

@dataclass
class LocationMimic:
    _valid: bool
    lon: float
    lat: float

    def valid(self) -> bool:
        return self._valid

@dataclass
class RelationMemberMimic:
    type: str
    role: str
    ref: int

class OSMObjectMimic:
    """
    Mutable object with some of the attributes of an OSMObject, buildable from
    an existing instance.
    Turns `.tags` into a dictionary of strings.
    The presence of attributes that are specific to a subclass of OSMObject is
    conditional and should be checked with `hasattr()`.
    Allows marking objects for skipping by setting the `skip` attribute to `True`.
    """

    __slots__ = ["skip", "positive_id", "id", "tags", "location", "members"]

    def __init__(self, source: Optional[OSMObject] = None):
        self.skip = False
        """Whether to skip saving this element."""

        self.id = source and source.id
        self.positive_id = source and source.positive_id()

        self.tags = {}
        if source:
            for tag in source.tags:
                self.tags[tag.k] = tag.v

        if hasattr(source, "location"):
            location: Location = source.location
            self.location = LocationMimic(
                location.valid(),
                location.lon_without_check(),
                location.lat_without_check(),
            )

        if hasattr(source, "members"):
            members: RelationMemberList = source.members
            self.members: List[RelationMemberMimic] = [
                RelationMemberMimic(member.type, member.role, int(member.ref))
                for member in members if member.type != "w"
            ]

But I still got the error. At this point everything I'm keeping should be a primitive type, unless Tag.k, Tag.v, RelationMember.ref or RelationMember.type are objects and not strings/ints, but in that case, they do behave exactly like those so that would surprise me.

fsalerno-sdc commented 3 years ago

I've made some changes in hopes of lifting the error but so far it still occurs. I am really stuck, I'd appreciate it if you could take a look and give some insight as to what could be causing it. I'll include my whole class this time for good measure, hopefully you'll find it well documented enough to navigate easily, if not, let me know.

@dataclass
class LocationMimic:
    _valid: bool
    lon: float
    lat: float

    def valid(self) -> bool:
        return self._valid

@dataclass
class RelationMemberMimic:
    type: str
    role: str
    ref: int

class OSMObjectMimic:
    """
    Mutable object with some of the attributes of an OSMObject, buildable from
    an existing instance.
    Turns `.tags` into a dictionary of strings.
    The presence of attributes that are specific to a subclass of OSMObject is
    conditional and should be checked with `hasattr()`.
    Allows marking objects for skipping by setting the `skip` attribute to `True`.
    """

    __slots__ = ["skip", "positive_id", "id", "tags", "location", "members"]

    def __init__(self, source: Optional[OSMObject] = None):
        self.skip = False
        """Whether to skip saving this element."""

        self.id = source and int(source.id)
        self.positive_id = source and int(source.positive_id())

        self.tags = {}
        if source:
            for tag in source.tags:
                self.tags[str(tag.k)] = str(tag.v)

        if hasattr(source, "location"):
            self.location = LocationMimic(
                bool(source.location.valid()),
                float(source.location.lon_without_check()),
                float(source.location.lat_without_check()),
            )

        if hasattr(source, "members"):
            self.members: List[RelationMemberMimic] = [
                RelationMemberMimic(str(member.type), str(member.role), int(member.ref))
                for member in source.members
                if member.type != "w"
            ]
# ProgressReportMixin is just there to keep track of how many items were processed,
#  and is responsible for the calls to self.advance_progress(), but it never touches any
#  OSM object so you can ignore those.
class JointHandler(ProgressReportMixin, osmium.SimpleHandler):
    # I've left this in just in case, but I think you can safely ignore init.
    def __init__(
        self,
        meta: MetaHandler,
        countries_file: str = "countries.tsv",
        admins_file: str = "admins.tsv",
        places_file: str = "places.tsv",
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)

        self.countries_file_name = countries_file
        self.admins_file_name = admins_file
        self.places_file_name = places_file

        # Headers are the names of the columns in the files I write to.
        # Keys (below) are the OSM tag keys to look for.
        name_headers = [
            "name_eng",
            "name_deu",
            "name_fra",
            "name_ita",
            "name_spa",
            "name_por",
        ]
        name_keys = [
            "name:en",
            "name:de",
            "name:fr",
            "name:it",
            "name:es",
            "name:pt",
        ]
        abbreviation_headers = [
            "abbr",
            *[f"abbr_{lang.partition('_')[2]}" for lang in name_headers],
        ]
        abbreviation_keys = [
            "short_name",
            *[f"short_name:{lang.partition('_')[2]}" for lang in name_keys],
        ]

        self.headers = {
            self.countries_file_name: [
                "country_id",
                *name_headers,
            ],
            self.admins_file_name: [
                *name_headers,
                *abbreviation_headers,
                "country_id",
            ],
            self.places_file_name: [
                "latitude",
                "longitude",
                "wikidata_id",
                *name_headers,
                "population",
                "admin_id",
                "country_id",
                "is_capital",
            ],
        }
        self.keys = {
            self.countries_file_name: [
                "country_id",
                *name_keys,
            ],
            self.admins_file_name: [
                *name_keys,
                *abbreviation_keys,
                "country_id",
            ],
            self.places_file_name: [
                "latitude",
                "longitude",
                "wikidata",
                *name_keys,
                "population",
                "admin_id",
                "country_id",
                "capital",
            ],
        }

        self.files: Dict[str, IO] = {}
        self.duplicates: Dict[str, Set[int]] = defaultdict(set)

        self.accepted_places = {
            "city",
            "town",
            "village",
            "hamlet",
        }
        """Types of place that will be collected, as listed here:
        https://wiki.openstreetmap.org/wiki/Key:place#Populated_settlements.2C_urban
        https://wiki.openstreetmap.org/wiki/Key:place#Populated_settlements.2C_urban_and_rural
        """

        # I do a preliminary run with a different handler that collects some metadata and passes it
        #  down to this handler. The following attributes are populated from there.
        # Meta
        self.country_by_label: Dict[int, int] = meta.country_by_label
        self.country_id_by_country_code: Dict[
            str, int
        ] = meta.country_id_by_country_code
        self.country_code_by_subarea: Dict[int, str] = meta.country_code_by_subarea
        self.country_by_subarea: Dict[int, int] = meta.country_by_subarea
        self.admin_by_name: Dict[str, int] = meta.admin_by_name

        logger.info(f"Initialised handler.")

    def __enter__(self) -> "JointHandler":
        self.load_file(self.countries_file_name)
        self.load_file(self.admins_file_name)
        self.load_file(self.places_file_name)

        return self

    def __exit__(self, *_, **__):
        for file in self.files.values():
            logger.debug(f"Closing file {file.name}")
            file.close()

    def load_file(self, destination: str):
        """Create a file with headers or load an existing one to skip duplicates."""
        if os.path.isfile(destination):
            logger.info(f"{destination} exists.")

            ends_in_newline = False
            total_duplicates = 0
            with open(destination, "r", encoding="utf-8") as fp:
                for line in fp:
                    try:
                        self.duplicates[destination].add(int(line.partition("\t")[0]))
                        total_duplicates += 1
                    except ValueError:
                        continue

                if line.endswith("\n"):
                    ends_in_newline = True

            logger.info(f"Found {total_duplicates} duplicates.")

            dest_file = open(destination, "a", encoding="utf-8")
            if not ends_in_newline:
                dest_file.write("\n")
        else:
            dest_file = open(destination, "w", encoding="utf-8")
            dest_file.write("\t".join(["id", *self.headers[destination]]) + "\n")

        self.files[destination] = dest_file

    def node(self, node: Node):
        self.advance_progress()
        tags = node.tags

        if tags.get("place") == "country":
            self.save_country(OSMObjectMimic(node))

        elif tags.get("place") in self.accepted_places and any(
            k.startswith("name") for k, _ in tags
        ):
            self.save_place(OSMObjectMimic(node))

    def relation(self, relation: Relation):
        self.advance_progress()
        tags = relation.tags

        if tags.get("admin_level") == "4" and tags.get("boundary") == "administrative":
            self.save_admin(OSMObjectMimic(relation))

    def save_country(self, element: OSMObjectMimic):
        if element.positive_id in self.duplicates[self.countries_file_name]:
            logger.info(f"Found country {element.positive_id} is duplicate.")
            return

        element.tags["country_id"] = (
            self.country_by_label.get(element.id)
            or self.country_by_label.get(element.positive_id)
            or self.country_id_by_country_code.get(element.tags.get("ISO3166-1:alpha2"))
            or self.country_id_by_country_code.get(element.tags.get("ISO3166-1"), "")
        )
        logger.debug(
            f"Country {element.positive_id} has country id {element.tags['country_id']}"
        )

        self.save_element(element, self.countries_file_name)

    def save_admin(self, element: OSMObjectMimic):
        if element.positive_id in self.duplicates[self.admins_file_name]:
            logger.info(f"Found admin {element.positive_id} is duplicate.")
            return

        try:
            ccode = self.country_code_by_subarea.get(
                element.positive_id
            ) or self.country_code_by_subarea.get(element.id)
            country_id = self.country_id_by_country_code[ccode]
            logger.debug(f"Admin {element.positive_id} has country id {country_id}")
        except KeyError:
            country_id = self.country_by_subarea.get(
                element.id
            ) or self.country_by_subarea.get(element.positive_id, "")
            logger.debug(f"Admin {element.positive_id} has country id {country_id}")

        element.tags["country_id"] = country_id

        if not country_id:
            logger.debug(f"Admin {element.positive_id} has no country id. Skipping.")
            return

        self.save_element(element, self.admins_file_name)

    def save_place(self, element: OSMObjectMimic):
        if element.positive_id in self.duplicates[self.places_file_name]:
            logger.info(f"Found place {element.positive_id} is duplicate.")
            return

        tags = element.tags

        country_id = None
        admin_id = None

        # Get location if possible
        longitude = ""
        latitude = ""
        if hasattr(element, "location"):
            location = element.location
            if location.valid():
                longitude = str(location.lon)
                latitude = str(location.lat)

                geodata = get_geodata_from_photon(latitude, longitude)

                admin_name = geodata.get("state", "").lower()
                admin_id = self.admin_by_name.get(admin_name.lower())
                country_id = self.country_by_subarea.get(admin_id)

                # Fallback: if admin not available get at least country using country code
                if not country_id:
                    country_code = geodata.get("countrycode", "")
                    country_id = self.country_id_by_country_code.get(country_code)

        tags["admin_id"] = admin_id or ""
        tags["country_id"] = country_id or ""
        if not country_id:
            element.skip = True

        tags["longitude"] = longitude
        tags["latitude"] = latitude
        # Capital value must be interpreted/converted
        tags["capital"] = int(tags.get("capital", "") in ("yes", "2"))

        self.save_element(element, self.places_file_name)

    def save_element(self, element: OSMObjectMimic, destination: str):
        if element.skip:
            logger.info(f"Element {element.positive_id} is marked as skipped.")
            return

        tags = element.tags

        data = []
        for key in self.keys[destination]:
            value = tags.get(key, "")

            if not value and key.startswith("name"):
                # Default to general/English name if not found
                value = tags.get("name:en") or tags.get("name", "")

            data.append(value)

        # Avoid recording line if empty
        if not any(data):
            logger.warning(f"Found item {element.positive_id} has no useful data.")
            return

        to_save = [str(item) for item in [element.positive_id, *data]]
        formatted = "\t".join(to_save)

        self.files[destination].write(formatted + "\n")
fsalerno-sdc commented 3 years ago

The exception also comes directly from .apply_file(), so there's no way for me to catch it from inside a handler to look into the data that causes it, nor can I simply choose to ignore the problematic node and keep going. This is probably beyond the control of the Python bindings but if something can be done to give more control to the library user, it would be appreciated.

fsalerno-sdc commented 3 years ago

@lonvia I'm sorry to bother you so much but this problem is frustrating me quite a bit and costing me weeks of extra work; at this point I've resorted to manual garbage collection but even that doesn't seem to work.

If you don't have time to look at my code, could I ask about the apply_buffer() method of SimpleHandler? By the available documentation I'm not sure how to use it exactly starting from an .osm.pbf file. Do I need to feed it PrimitiveBlocks after reading them with a protobuffer parser? Would that work, or how else should I approach this? If I could use apply_buffer() instead of apply_file() I could at least skip problematic nodes and complete the rest of the task, instead of having to stop completely. Thank you for your patience.

lonvia commented 2 years ago

I'm closing this because it is simply not possible to debug such a complex script. I can't even find the call to apply_file(), so it is not even sure how the script is used. If you can provide a simple (and complete) example that triggers the issue, please open a new issue.

Arzintu commented 2 years ago

Ran into the error recently myself

Problem The problem arises when the osm node is passed on to another function. The way I processed data from nodes, ways, and relations was all the same so I created a helper extract_data function.

Using a simplified example:

def node(self, n):
       self.extract_data(n)

Error: Node callback keeps a reference to OSM object

Solution For some reason, the osm node data seems to linger around when passed into another function. So the solution is to delete it after processing the node. I haven't done a deep dive into the full explanation as to why this occurs, nor do I plan to. Not enough of a coding wizard to do so. The fix however was satisfactory enough for my purposes.

    def node(self, n):
       self.extract_data(n)
       del n
lonvia commented 2 years ago

Please don't post on closed issues. There is an open issue for this at #208. Also, please provide completely reproducible code example. In this particular case: provide an implementation of extract_data() where you see the behaviour.