ATLBitLab / abbot

Abbot (ATL BitLab Bot)
https://atlbitlab.com/abbot
MIT License
2 stars 2 forks source link

Create AbbotBuilder: Real-time Event Handling #53

Closed bnonni closed 11 months ago

bnonni commented 11 months ago

Something to this effect ...

import asyncio

class AppBuilder:
    def __init__(self):
        self.host = '127.0.0.1'
        self.port = 8888
        self.group1_handlers = []
        self.group2_handlers = []
        self.group3_handlers = []

    def add_group1_handler(self, handler):
        self.group1_handlers.append(handler)
        return self

    def add_group2_handler(self, handler):
        self.group2_handlers.append(handler)
        return self

    def add_group3_handler(self, handler):
        self.group3_handlers.append(handler)
        return self

    async def handle_client(self, reader, writer):
        while True:
            data = await reader.read(100)
            if not data:
                break

            # Replace the following line with your actual message parsing logic
            parsed_message = ... # Your message parsing logic here

            kind = parsed_message.get('kind', None)

            if kind == 4:
                for handler in self.group1_handlers:
                    await handler(data, reader, writer)

            elif kind in [40, 41, 42, 43, 44]:
                for handler in self.group2_handlers:
                    await handler(data, reader, writer)

            elif kind == 21021:
                for handler in self.group3_handlers:
                    await handler(data, reader, writer)

    async def run(self):
        server = await asyncio.start_server(
            self.handle_client, self.host, self.port
        )
        addr = server.sockets[0].getsockname()
        print(f'Serving on {addr}')

        async with server:
            await server.serve_forever()

# Example handlers
async def group1_handler(data, reader, writer):
    print(f"Group 1 received: {data}")

async def group2_handler(data, reader, writer):
    print(f"Group 2 received: {data}")

async def group3_handler(data, reader, writer):
    print(f"Group 3 received: {data}")

# Example usage
app = (AppBuilder()
       .add_group1_handler(group1_handler)
       .add_group2_handler(group2_handler)
       .add_group3_handler(group3_handler))
asyncio.run(app.run())
bnonni commented 11 months ago

Switched to use nostr-sdk instead. This is no longer needed.