mosquito / cysystemd

systemd wrapper on Cython
Apache License 2.0
101 stars 21 forks source link

async io example stuck #65

Open gertcuykens opened 3 weeks ago

gertcuykens commented 3 weeks ago
import asyncio
import logging
import json

from cysystemd.reader import JournalOpenMode, Rule
from cysystemd.async_reader import AsyncJournalReader

record_count = 0

async def log(name="test"):
    global record_count
    rules = Rule("CONTAINER_NAME", name)
    reader = AsyncJournalReader()
    await reader.open(JournalOpenMode.SYSTEM)
    await reader.add_filter(rules)
    await reader.seek_tail()
    logging.info("Starting journal monitoring.")
    while await reader.wait():
        print('reading')
        async for record in reader:
            print(record_count)
            record_count += 1
            logging.info(record.data.get('MESSAGE'))

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    try:
        asyncio.run(log())
    except KeyboardInterrupt:
        print(record_count)
        logging.info("Stopping journal monitoring.")

this will end up in deadlock if new entries are coming in fast enough, i think there is a missing peace of the puzel where the systemd implementation has the following in addition in there handler.

    def handler(self):
         if self.journal.process() != journal.APPEND:
             return
         for entry in self.journal:
             message = entry.get("MESSAGE", "")
             logging.info(f"{message}")

So i believe this need to be implemented as wel what ever this self.journal.process() != journal.APPEND means

mosquito commented 3 weeks ago

Do you want to contribute it?