pathwaycom / pathway

Python ETL framework for stream processing, real-time analytics, LLM pipelines, and RAG.
https://pathway.com
Other
2.84k stars 98 forks source link

[Bug]: Filewriter doesn't seem to write any data to json while using custom connector #34

Closed TaufeeqNoamaan closed 2 months ago

TaufeeqNoamaan commented 2 months ago

Steps to reproduce

I'm trying to connect reddit with pathway

This is the code `class RedditClient: def init(self, keyword, client_id= , client_secret= , user_agent='pathway::1.0 (by /u/)'): self.reddit = praw.Reddit(client_id=client_id, client_secret=client_secret, user_agent=user_agent) self.keyword = keyword

def get_top_comments(self, limit=10):
    submissions = self.reddit.subreddit('all').search(self.keyword, sort='top', limit=limit)
    top_comments = []
    for submission in submissions:
        submission.comments.replace_more(limit=0)
        for comment in submission.comments:
            if isinstance(comment, praw.models.Comment):
                top_comments.append(comment.body)
    return top_comments

def read(self):
    top_comments = self.get_top_comments()
    for idx, comment in enumerate(top_comments, start=1):
        yield {'key': idx, 'text': comment}

def write(self, data):
    raise NotImplementedError("This connector does not support writing")

def delete(self, key):
    raise NotImplementedError("This connector does not support deleting")

def update(self, key, data):
    raise NotImplementedError("This connector does not support updating")

class RedditSubject(pw.io.python.ConnectorSubject): _reddit_client: RedditClient

def __init__(self, keyword) -> None:
    super().__init__()
    self._reddit_client = RedditClient(keyword)

def run(self) -> None:
    top_comments = self._reddit_client.get_top_comments()
    for comment in top_comments:
        print("Emitting comment:", comment)  # Debug statement
        self.next_json({"text": comment})

def on_stop(self) -> None:
    pass  # RedditClient doesn't have a disconnect method

class InputSchema(pw.Schema): key: int = pw.column_definition(primary_key=True) text: str

input = pw.io.python.read( RedditSubject("python"), schema=InputSchema, autocommit_duration_ms=1000, ) pw.io.jsonlines.write(input, "table.jsonlines") pw.run()`

Reddit client_id and secret can be obtained from here: https://www.reddit.com/prefs/apps

Relevant log output

LOGS                                                                                                                                                                                                              
                      ERROR    Read data parsed unsuccessfully. field key with no JsonPointer path specified is absent in {"text":"[deleted]"}                                                                                                                                                                                                                                                                                 

  [04/02/24 17:20:06] ERROR    Read data parsed unsuccessfully. field key with no JsonPointer path specified is absent in {"text":"What is this? A reddit thread that's being generally pro-vegan? \n\nWhat's next, civil discourse with people adjusting their views when presented with new info?!?"}                                                                                                                        
                      ERROR    Read data parsed unsuccessfully. field key with no JsonPointer path specified is absent in {"text":"Vegans

What did you expect to happen?

The data should've been written to the json file

Screenshot from 2024-04-02 16-50-16 Screenshot from 2024-04-02 17-07-24

Version

0.8.5

Docker Versions (if used)

No response

OS

Linux

On which CPU architecture did you run Pathway?

x86-64

KamilPiechowiak commented 2 months ago

Hey @TaufeeqNoamaan, The error message indicates that you don't pass the key field to Pathway. Indeed, there's only text field in the message. In you code, you do self.next_json({"text": comment}) so only text field is passed to Pathway. Please add the key field here as well and it should work. Please let me know if that helps.

TaufeeqNoamaan commented 2 months ago

Hey @KamilPiechowiak

I rectified it, but still the same error persists def run(self) -> None: top_comments = self._reddit_client.get_top_comments() for idx, comment in enumerate(top_comments, start=1): self._subject.next_json({"key": idx, "text": comment}) yield {"key": idx, "text": comment}

The dashboard won't open now

KamilPiechowiak commented 2 months ago

Please paste the error message. Are the error messages being logged for each input message again?

TaufeeqNoamaan commented 2 months ago

After including the "key" param, I'm not getting any error logs as such. The dashboard won't open and the file just executes and terminates

KamilPiechowiak commented 2 months ago

Does it terminate with an error? Did you check if the messages are written to table.jsonlines after executing the program?

TaufeeqNoamaan commented 2 months ago

Nope, doesn't terminate with an error and no messages are written to jsonlines files.

Also tried output with csv, same thing.

KamilPiechowiak commented 2 months ago

Your run method should look like this:

def run(self) -> None:
        top_comments = self._reddit_client.get_top_comments()
        for idx, comment in enumerate(top_comments, start=1):
            self.next_json({"key": idx, "text": comment})

In the code you pasted, you have self._subject.next_json which means that your method is probably outside of RedditSubject and it is impossible for me to say what's wrong. Please paste the full version of your code (and please take care of properly formatting the code).

TaufeeqNoamaan commented 2 months ago

You are right @KamilPiechowiak, it was indeed a problem in the scope of the method.

Issue is solved!

TaufeeqNoamaan commented 2 months ago

Thanks a lot!!

KamilPiechowiak commented 2 months ago

I'm glad that your problem is solved.