faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.65k stars 183 forks source link

Question: how to enable JSON logging in Faust? #468

Open GlorianY opened 1 year ago

GlorianY commented 1 year ago

Hi,

I would like to ask, how can I jsonify the output of Faust?

I have tried to set logHandler of Faust with json-formatted log handler from pythonjsonlogger:

from pythonjsonlogger import jsonlogger

logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
logHandler.setFormatter(formatter)

app = faust.App(
           ...,
           loghandlers=[logHandler],
)

But, this does not jsonify the output of Faust.

Also, I have tried to use jsonlogger in my root log config, as specified in here. Afterwards, I pass the log config dictionary (logging.config.dictConfig()) to the logging_config parameter, as in

app = faust.App(
           ...,
           logging_config =json_log_dict_config,
)

However, this does not seem to jsonify Faust log as well.

The faust-streaming release which I am using is "0.10.1".

Could you help me to resolve this issue?

Thank you very much for your help!

Checklist

Versions

wbarnha commented 1 year ago

I ran the following with Faust v0.10.4 by modifying the example in examples/word_count.py:

#!/usr/bin/env python
import asyncio
import faust

import logging
from pythonjsonlogger import jsonlogger

logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
logHandler.setFormatter(formatter)

WORDS = ['the', 'quick', 'brown', 'fox']

app = faust.App(
    'word-counts',
    broker='kafka://localhost:9092',
    store='rocksdb://',
    version=1,
    loghandlers=[logHandler],
    topic_partitions=8,
)

posts_topic = app.topic('posts', value_type=str)
word_counts = app.Table('word_counts', default=int,
                        help='Keep count of words (str to int).')

@app.agent(posts_topic)
async def shuffle_words(posts):
    async for post in posts:
        for word in post.split():
            await count_words.send(key=word, value=word)

last_count = {w:0 for w in WORDS}
@app.agent(value_type=str)
async def count_words(words):
    """Count words from blog post article body."""
    async for word in words:
        word_counts[word] += 1
        last_count[word] = word_counts[word]

@app.page('/count/{word}/')
@app.table_route(table=word_counts, match_info='word')
async def get_count(web, request, word):
    return web.json({
        word: word_counts[word],
    })

@app.page('/last/{word}/')
@app.topic_route(topic=posts_topic, match_info='word')
async def get_last(web, request, word):
    return web.json({
        word: last_count,
    })

@app.task
async def sender():
    await posts_topic.maybe_declare()

    for word in WORDS:
        for _ in range(1000):
            await shuffle_words.send(value=word)

    await asyncio.sleep(5.0)
    print(word_counts.as_ansitable(
        key='word',
        value='count',
        title='$$ TALLY $$',
        sort=True,
    ))

@app.on_rebalance_complete.connect
async def on_rebalance_complete(sender, **kwargs):
    print(word_counts.as_ansitable(
        key='word',
        value='count',
        title='$$ TALLY - after rebalance $$',
        sort=True,
    ))

if __name__ == '__main__':
    app.main()

Run this in your terminal:

faust -A examples.word_count worker -l info

Sample log:

{"message": "[^---Fetcher]: Starting...", "extra": ""}
[2023-03-16 10:09:17,684] [26131] [INFO] [^---Recovery]: Worker ready 
{"message": "[^---Recovery]: Worker ready", "extra": ""}
[2023-03-16 10:09:17,684] [26131] [INFO] [^Worker]: Ready 
{"message": "[^Worker]: Ready", "extra": ""}
[2023-03-16 10:09:17,685] [26131] [INFO] [^--Producer]: Creating topic 'posts' 
{"message": "[^--Producer]: Creating topic 'posts'", "extra": ""}
[2023-03-16 10:09:24,427] [26131] [WARNING] ┌$$ Tally $$────┐
│ Word  │ Count │
├───────┼───────┤
│ brown │ 1232  │
│ fox   │ 1233  │
│ quick │ 1232  │
│ the   │ 1517  │
└───────┴───────┘ 
{"message": "\u001b(0l\u001b(B$$ Tally $$\u001b(0qqqqk\u001b(B\n\u001b(0x\u001b(B Word  \u001b(0x\u001b(B Count \u001b(0x\u001b(B\n\u001b(0tqqqqqqqnqqqqqqqu\u001b(B\n\u001b(0x\u001b(B brown \u001b(0x\u001b(B 1232  \u001b(0x\u001b(B\n\u001b(0x\u001b(B fox   \u001b(0x\u001b(B 1233  \u001b(0x\u001b(B\n\u001b(0x\u001b(B quick \u001b(0x\u001b(B 1232  \u001b(0x\u001b(B\n\u001b(0x\u001b(B the   \u001b(0x\u001b(B 1517  \u001b(0x\u001b(B\n\u001b(0mqqqqqqqvqqqqqqqj\u001b(B", "extra": ""}
GlorianY commented 1 year ago

Thanks for your reply @wbarnha !

I saw that it seems not all logs are jsonified? For example

[2023-03-16 10:09:17,684] [26131] [INFO] [^---Recovery]: Worker ready 
[2023-03-16 10:09:17,684] [26131] [INFO] [^Worker]: Ready 
[2023-03-16 10:09:17,685] [26131] [INFO] [^--Producer]: Creating topic 'posts' 
[2023-03-16 10:09:24,427] [26131] [WARNING] ┌$$ Tally $$────┐
│ Word  │ Count │
├───────┼───────┤
│ brown │ 1232  │
│ fox   │ 1233  │
│ quick │ 1232  │
│ the   │ 1517  │
└───────┴───────┘ 

I also have tried to run faust --json -A .... But, this seems also not jsonifying all Faust logs.

wbarnha commented 1 year ago
[2023-03-16 11:24:55,445] [13612] [INFO] [^---Recovery]: Seek stream partitions to committed offsets. 
{"message": "[^---Recovery]: Seek stream partitions to committed offsets.", "extra": ""}
[2023-03-16 11:24:55,455] [13612] [INFO] [^---Fetcher]: Starting... 
{"message": "[^---Fetcher]: Starting...", "extra": ""}
[2023-03-16 11:24:55,455] [13612] [INFO] [^---Recovery]: Worker ready 
{"message": "[^---Recovery]: Worker ready", "extra": ""}
[2023-03-16 11:24:55,455] [13612] [INFO] [^Worker]: Ready 
{"message": "[^Worker]: Ready", "extra": ""}
[2023-03-16 11:24:55,456] [13612] [INFO] [^--Producer]: Creating topic 'posts' 
{"message": "[^--Producer]: Creating topic 'posts'", "extra": ""}

Odd that it's doing that for you. I wonder if something internal is messing with the logging.

GlorianY commented 1 year ago

@wbarnha Hmm, if you take a look at the log closely

[2023-03-16 11:24:55,455] [13612] [INFO] [^Worker]: Ready 
{"message": "[^Worker]: Ready", "extra": ""}
[2023-03-16 11:24:55,456] [13612] [INFO] [^--Producer]: Creating topic 'posts' 
{"message": "[^--Producer]: Creating topic 'posts'", "extra": ""}

They are printed twice - one with the JSON format, another without JSON format.

Is there a way to remove the duplicated logs ?

I have tried to mute these logs with logger.propagate=False, but these logs seem to be still appearing.

crflynn commented 1 year ago

The duplicates come from this "default" log handler that gets created in the mode lib: https://github.com/ask/mode/blob/a104009f0c96790b9f6140179b4968da07a38c81/mode/utils/logging.py#L463

If you change the line to

        'handlers': [],

they go away.

GlorianY commented 1 year ago

The duplicates come from this "default" log handler that gets created in the mode lib: https://github.com/ask/mode/blob/a104009f0c96790b9f6140179b4968da07a38c81/mode/utils/logging.py#L463

If you change the line to

        'handlers': [],

they go away.

@crflynn: But in the code that you are referring to, there seems to be no argument for supplying the handlers value, right? If this is the case, then how can we make 'handlers': [] ?

crflynn commented 1 year ago

With the way it's written you'd have to patch it out. I do something like this:

# patch.py
import logging
import logging.config
from typing import IO
from typing import Dict
from typing import List
from typing import Union

from mode.utils import logging as mode_logging

def _setup_logging(
    *,
    level: Union[int, str] = None,
    filename: str = None,
    stream: IO = None,
    loghandlers: List[logging.Handler] = None,
    logging_config: Dict = None,
) -> None:
    handlers = {}
    if filename:
        assert stream is None
        handlers.update(
            {
                "default": {
                    "level": level,
                    "class": "logging.FileHandler",
                    "formatter": "default",
                    "filename": filename,
                },
            }
        )
    elif stream:
        handlers.update(
            {
                "default": {
                    "level": level,
                    "class": "colorlog.StreamHandler",
                    "formatter": "colored",
                },
            }
        )
    config = mode_logging.create_logconfig(
        handlers=handlers,
        root={
            "level": level,
            "handlers": [],  # my change here
        },
    )
    if logging_config is None:
        logging_config = config
    elif logging_config.pop("merge", False):
        logging_config = {**config, **logging_config}
        for k in ("formatters", "filters", "handlers", "loggers", "root"):
            logging_config[k] = {**config.get(k, {}), **logging_config.get(k, {})}
    logging.config.dictConfig(logging_config)
    if loghandlers is not None:
        logging.root.handlers.extend(loghandlers)

def patch_mode_logging():
    """This patches the logging setup in the mode lib.
    All it does is remove the "default" loghandler that is created
    when you do not pass a log config.
    """
    mode_logging._setup_logging = _setup_logging

And then in your config file or wherever you set up your logging you can just run that function.

# config.py
from .patch import patch_mode_logging

patch_mode_logging()