AKST / Australian-Address-Boundaries-Land-Property-Price-Database

This is a database of geographic boundaries, addresses as well as land and property data (mostly NSW).
MIT License
2 stars 0 forks source link

Synchronise access to Cache to prevent corruption of state with IPC #24

Open AKST opened 2 months ago

AKST commented 2 months ago

The problem

Currently if you have 2 processes (in particular sub processes) running that use the CachedClientSession if they both start adding items to the cache, one of there processes is going lose the references to the data it fetched, and any file saved to _out_cache will be orphaned. Not good.

Solution

There are some other approaches like making making the data a flat CSV and allow new data to just be appended to it. That sounds pretty reasonable tbh, but I want to have fun here.

So what I'm going to do is create some kind of erlang style IPC layer where there's a process that "owns" the responsibility of reading and writing to the cache state as well as performing the network requests. Some kind of Http Daemon.

The approach

An example of the multiple processing API

Here's an example for a simple daemon that just counts. It doesn't show how we start the process or we discover the process

Server

from multiprocessing.connection import Listener

class CounterManager:
    def __init__(self):
        self.counter = 0

    def handle_request(self, request):
        if request['action'] == 'increment':
            self.counter += 1
        elif request['action'] == 'decrement':
            self.counter -= 1
        elif request['action'] == 'reset':
            self.counter = 0
        return self.counter  # Always return the current counter value

    def run(self, address):
        listener = Listener(address)
        print(f"Counter Manager running on {address}...")
        while True:
            conn = listener.accept()
            request = conn.recv()  # Receive a message from a client
            response = self.handle_request(request)
            conn.send(response)  # Send back the current counter value
            conn.close()

if __name__ == '__main__':
    manager = CounterManager()
    manager.run(('localhost', 6000))  # Manager listens on port 6000

Client

from multiprocessing.connection import Client

def send_request(action):
    with Client(('localhost', 6000)) as conn:
        request = {'action': action}
        conn.send(request)  # Send a request (increment, decrement, reset)
        response = conn.recv()  # Receive the current counter value
        return response

if __name__ == '__main__':
    # Example usage of the client
    print(f"Incrementing: {send_request('increment')}")
    print(f"Incrementing: {send_request('increment')}")
    print(f"Decrementing: {send_request('decrement')}")
    print(f"Resetting: {send_request('reset')}")
    print(f"Current Counter: {send_request('increment')}")  # To check the final value

What I want to do is use some form of IPC to create a message passing interface to interact with the cache.