faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.65k stars 183 forks source link

Possible issue with open method when using rocksdict #573

Closed fonty422 closed 11 months ago

fonty422 commented 11 months ago

Steps to reproduce

Trying to run with store=rocksdb:// on windows

Expected behavior

I expect it can store data in tables and recover quickly when changelogs are long

Actual behavior

Errors with [^----Store: rocksdb:

]: DB for partition 2 retries timed out.

I tracked it down to the fact that the resource module (used in faust/utils/platforms) is a Unix only thing so the _max_open_files variable defaults to None, which then flows on to a crash because it's trying to set the _max_open_files on db.options (in faust/stores/rocksdb.py) which does not like a None. Setting this default to 1 instead of None resolved that issue, but then I encountered another issue where the open method tries to call rocksdict.AccessType.ReadWrite, which doesn't exist. Looking at the AccessType method shows it doesn't have ReadWrite, but does have read_write, so I replaced with rocksdict.AccessType.read_write() and it worked.

I'm fairly certain there is no issue here except that I'm trying to use this with Windows. But I did get it working, so I'm not sure whether there's something here to explore or this is a big don't.

fonty422 commented 11 months ago

Here's the solution I implemented. Let me know if this is bad/wrong and whether it's a fix that needs to be implemented. source

def open(self, path: Path, *, read_only: bool = False) -> DB:
        """Open RocksDB database using this configuration."""
        if self.use_rocksdict:
            db_options = self.as_options()
            db_options.set_db_paths(
                [rocksdict.DBPath(str(path), self.target_file_size_base)]
            )
            db_access_type = (
                # rocksdict.AccessType.ReadWrite <== This line changes
                rocksdict.AccessType.read_write()
                if self.ttl is None
                else rocksdict.AccessType.with_ttl(self.ttl)
            )
            db = DB(str(path), options=self.as_options(), access_type=db_access_type)
            db.set_read_options(rocksdict.ReadOptions())
            return db
        else:
            return rocksdb.DB(str(path), self.as_options(), read_only=read_only)

and to resolve the NoneType nfiles value error we modify the platforms module to check for Windows OS (which can't make use of the resource package) so it returns 1 instead of None.

"""Platform/OS utilities."""
import platform
import subprocess  # nosec: B404
from typing import Optional

def max_open_files() -> Optional[int]:
    """Return max number of open files, or :const:`None`."""
    try:
        from resource import RLIM_INFINITY, RLIMIT_NOFILE, getrlimit
    except ImportError:
        if platform.system() == "Windows": # <=== add a check for Windows OS to return something.
          return 512
        else:
          return None
    else:
        _, hard_limit = getrlimit(RLIMIT_NOFILE)
        if hard_limit == RLIM_INFINITY:
            # macOS bash always returns infinity, even though there
            # is an actual system limit.
            if platform.system() == "Darwin":
                output = subprocess.check_output(  # nosec
                    [
                        "sysctl",
                        "-q",
                        "kern.maxfilesperproc",
                    ]
                )
                # $ sysctl -q kern.maxfilesperproc
                # kern.maxfilesperproc: 24576
                _, _, svalue = output.decode().partition(":")
                return int(svalue.strip())
            return None
        return hard_limit
fonty422 commented 11 months ago

this seems to suggest that the default is 512.

clafoutis42 commented 11 months ago

I'm experiencing the exact same issue

fonty422 commented 11 months ago

Are you also using Windows, or Linux? If Linux, then the issue with ReadWrite might be universal and is a bug that needs fixing. The NoneType for nfiles error is definitely related to Windows only OS, so you'd need to replace the code in faust/utils/platforms.py with the second snippet. Did using my example above resolve the issue for you? I did have an error in there, so you might need to copy/paste again since I fixed it.

steven-herchak commented 11 months ago

I'm seeing the exact same issue with Linux.

fonty422 commented 11 months ago

So you're seeing this then?

[INFO] Executing _on_partitions_assigned
[INFO] opening partition 2 for gen id 3 app id 3 #<== Numbers here might be different each time
[WARNING] [^----Store: rocksdb:<table_name>]: DB for partition 2 retries timed out
[INFO] Closing rocksdb on stop
[ERROR] [^-App]: Crashed reason=TypeError("argument 'nfiles': 'NoneType' object cannot be interpreted as an integer")

That would be interesting, as it would imply the issue for you is coming from the platforms module. I'd suggest adding some debug statements in there (just for yourself) so you can see why it's returning the None. If it's the ImportError then ensure you have the resource package installed (although I think that's default for Linux 🤷). If it's returning None in the next section then that's strange and requires someone who properly understands this to assist - I'm no pro with this stuff but can dabble enough to duct tape a temporary solution.

steven-herchak commented 11 months ago

Sorry I realise I didn't specify I was running into the second issue in the OP:

  File \"/usr/local/lib/python3.9/site-packages/faust/stores/rocksdb.py\", line 479, in _db_for_partition
    return self._dbs[partition]
    KeyError: 0
    During handling of the above exception, another exception occurred:

    Traceback (most recent call last):
      File \"/usr/local/lib/python3.9/site-packages/faust/app/base.py\", line 1775, in _on_partitions_assigned
          await T(self.tables.on_rebalance)(
            File \"/usr/local/lib/python3.9/site-packages/faust/utils/tracing.py\", line 133, in corowrapped
                await_ret = await ret
                  File \"/usr/local/lib/python3.9/site-packages/faust/tables/manager.py\", line 193, in on_rebalance
                      await T(table.on_rebalance)(
                        File \"/usr/local/lib/python3.9/site-packages/faust/utils/tracing.py\", line 133, in corowrapped
                            await_ret = await ret
                              File \"/usr/local/lib/python3.9/site-packages/faust/tables/base.py\", line 584, in on_rebalance
                                  await self.data.on_rebalance(assigned, revoked, newly_assigned, generation_id)
                                    File \"/usr/local/lib/python3.9/site-packages/faust/stores/rocksdb.py\", line 562, in on_rebalance
                                        await self.assign_partitions(self.table, newly_assigned, generation_id)
                                      File \"/usr/local/lib/python3.9/site-packages/faust/stores/rocksdb.py\", line 601, in assign_partitions
                                          await self._try_open_db_for_partition(
                                            File \"/usr/local/lib/python3.9/site-packages/faust/stores/rocksdb.py\", line 620, in _try_open_db_for_partition
                                                return self._db_for_partition(partition)
                                                  File \"/usr/local/lib/python3.9/site-packages/faust/stores/rocksdb.py\", line 481, in _db_for_partition
                                                      db = self._dbs[partition] = self._open_for_partition(partition)
                                                        File \"/usr/local/lib/python3.9/site-packages/faust/stores/rocksdb.py\", line 486, in _open_for_partition
                                                            return self.rocksdb_options.open(
                                                              File \"/usr/local/lib/python3.9/site-packages/faust/stores/rocksdb.py\", line 146, in open
                                                                  rocksdict.AccessType.ReadWrite
                                                                  AttributeError: type object 'builtins.AccessType' has no attribute 'ReadWrite'", "error.message": "AttributeError(\"type object 'builtins.AccessType' has no attribute 'ReadWrite'\")", "error.kind": "AttributeError"}

Additional info: I only see this using v0.10.17. As soon as I install v0.10.16 this problem goes away.

fonty422 commented 11 months ago

Ah, so it's the ReadWrite one. Try replacing line 146 in faust/stores/rocksdb.py with rocksdict.AccessType.read_write(). I'd copy the line and comment out the original so you can easily restore it back.

fonty422 commented 11 months ago

Oooo. Interesting that it goes away with v0.10.16. But I also just noticed that while you do also have the ReadWrite error, you start with a KeyError, so it might not be related to my issue at all.

steven-herchak commented 11 months ago

Try replacing line 146 in faust/stores/rocksdb.py with rocksdict.AccessType.read_write().

Yes this resolved the error

clafoutis42 commented 11 months ago

you start with a KeyError, so it might not be related to my issue at all.

Indeed, I also have the KeyError, but the KeyError is part of rocksdict expected behaviour when the database partition has not yet been initiated by the library (when faust starts and starts reading the data that's persisted by rocksdb)

wbarnha commented 11 months ago

Sorry I didn't get to address this sooner, I'll have this patched ASAP.