TulipJani / Zedd

9 stars 0 forks source link

I updated main.py to now support wayland instead of x11 #1

Open jamieduk opened 1 month ago

jamieduk commented 1 month ago

I dont know how to send so here is the code

import pygame
from config import AUDIO_FILE_PATH, BACKGROUND_MUSIC_FILES
from audio import change_background_music, speechRecognition
from chat import get_response_with_prompt, switch_persona
from search import handle_search
from weather import get_weather
from utils import terminate_program, open_application, play_video_on_youtube, play_song_on_spotify, google_search, speak_response
from datetime import datetime
import time
import threading
from calendar_handler import initialize_calendar, add_event, list_events
from web_scraper import fetch_news
from whatsapp_handler import send_whatsapp_message
import sys
import random
import shutil
from colorama import Fore, init
from blessed import Terminal
from animation import init_animation

class NullWriter(object):
    def write(self, arg):   
        pass

sys.stdout=NullWriter()
sys.stdout=sys.__stdout__
init(autoreset=True)
pygame.mixer.init()
ascii_art="""

 #######    ##       ####     ##              ##   ##    #####    ####     ##### 
      ##   ####     ##  ##   ####             ##   ##   ##       ##  ##   ##     
     ##    ## ##   ##   ##   ## ##            ##   ##  ##       ##   ##  ##      
    ##    ##   ##  ######   ##   ##           #######  ######   ######   ######  
   ##     #######  ####     #######           ##   ##  ##       ####     ##      
  ##      ##   ##  ## ##    ##   ##           ##   ##  ##       ## ##    ##      
 #######  ##   ##  ##  ##   ##   ##           ##   ##  #######  ##  ##   ####### 

"""
def show_loading_animation():
    total_length=50
    terminal_width=shutil.get_terminal_size().columns

    for percent in range(101):
        bar=('#' * (percent * total_length // 100)).ljust(total_length)
        speed=random.uniform(10, 50)
        progress=f"Progress: |{bar}| {percent}%"
        speed_info=f"Speed: {speed:.2f} KB/s"

        sys.stdout.write(f"\r{progress} {speed_info}".ljust(terminal_width))
        sys.stdout.flush()
        time.sleep(0.01)

    print("\n")

def handle_help_command():
    print(Fore.GREEN + "\nAvailable Commands:")
    print(Fore.GREEN + "    - help: Display this help message")
    print(Fore.GREEN + "    - open [app_name]: Open a specified application")
    print(Fore.GREEN + "      Example: 'open Chrome'")
    print(Fore.GREEN + "    - play music: Play relaxing music on YouTube")
    print(Fore.GREEN + "    - weather: Get the current weather")
    print(Fore.GREEN + "    - add [task]: Add a task to the to-do list")
    print(Fore.GREEN + "      Example: 'add Complete homework on Math'")
    print(Fore.GREEN + "    - schedule: Display today's schedule")
    print(Fore.GREEN + "    - send [message] to [contact]: Send a WhatsApp message")
    print(Fore.GREEN + "      Example: 'send Hi there! to Mom'")
    print(Fore.GREEN + "    - search [query] on [platform]: Search on specified platform")
    print(Fore.GREEN + "      Example: 'search Python tutorials on YouTube'")
    print(Fore.GREEN + "    - play [playlist]: Play a playlist on Spotify")
    print(Fore.GREEN + "      Example: 'play Chill Out playlist'")
    print(Fore.GREEN + "    - news: Get the latest news")
    print(Fore.GREEN + "    - switch to [persona]: Switch to a different assistant persona")
    print(Fore.GREEN + "      Example: 'switch to Creative Zara'")

TODO_FILE="todo.txt"
def add_to_todo(task):
    with open(TODO_FILE, "a") as file:
        file.write(f"{task}\n")

def get_todo_list():
    try:
        with open(TODO_FILE, "r") as file:
            tasks=[task.strip() for task in file.readlines()]
            pending_tasks=[task for task in tasks if not task.startswith("[Completed]")]
            completed_tasks=[task for task in tasks if task.startswith("[Completed]")]
            return pending_tasks, completed_tasks
    except FileNotFoundError:
        return [], []

def delete_task(task_index):
    pending_tasks, completed_tasks=get_todo_list()
    if task_index < len(pending_tasks):
        del pending_tasks[task_index]
        with open(TODO_FILE, "w") as file:
            file.write("\n".join(pending_tasks + completed_tasks))

def mark_task_completed(task_index):
    pending_tasks, completed_tasks=get_todo_list()
    if task_index < len(pending_tasks):
        completed_tasks.append(f"[Completed] {pending_tasks[task_index]}")
        del pending_tasks[task_index]
        with open(TODO_FILE, "w") as file:
            file.write("\n".join(pending_tasks + completed_tasks))

def remind_todo():
    while True:
        time.sleep(60 * 30)
        tasks=get_todo_list()
        if tasks:
            pending_tasks="\n".join(f"- {task}" for task in tasks)
            speak_response("You have pending tasks in your to-do list.")
            print(Fore.GREEN + f"Zara: You have pending tasks:\n{pending_tasks}")

def play_music(file_path):
    pygame.mixer.music.load(file_path)
    pygame.mixer.music.play(-1)
    pygame.mixer.music.set_volume(.1)

def pause_music():
    if pygame.mixer.music.get_busy():
        pygame.mixer.music.pause()

def unpause_music():
    if not pygame.mixer.music.get_busy():
        pygame.mixer.music.unpause()

def change_music():
    change_background_music(BACKGROUND_MUSIC_FILES, pygame.mixer.music)

def process_input(user_input):
    if user_input.startswith("send me"):
        topic=user_input[len("send me "):]
        response=get_response_with_prompt(topic)
        send_whatsapp_message("+919081321913", response)
    elif user_input.startswith("send"):
        parts=user_input.split(maxsplit=3)
        if len(parts) == 4 and parts[2] == "to":
            topic=parts[1]
            contact_name=parts[3]
            if contact_name in contacts:
                phone_no=contacts[contact_name]
                response=get_response_with_prompt(topic)
                send_whatsapp_message(phone_no, response)
            else:
                print(Fore.GREEN + f"Contact {contact_name} not found in contacts.")
        else:
            print(Fore.GREEN + "Error: Please provide the correct command format like 'send topic to contact name'.")

def main():
    is_music_playing=False
    command_count=0

    threading.Thread(target=remind_todo, daemon=True).start()

    play_music(AUDIO_FILE_PATH)
    is_music_playing=True
    print(Fore.YELLOW + ascii_art)

    print(Fore.GREEN + "Zara: Hi there, I'm Zara.")
    first_input=True

    command_counter=0

    while True:
        user_input=input("You: ")
        command_counter += 1

        if first_input:
            first_input=False
            init_animation()

        if any(command in user_input for command in ["quit", "exit", "sleep", "deactivate"]):
            terminate_program()
            break

        elif user_input == "help":
            handle_help_command()

        elif user_input.startswith("switch to "):
            persona_name=user_input.split("switch to ")[1].strip().title()
            switch_persona(persona_name)
            print(Fore.GREEN + f"Zara: Switched to {persona_name}.")

        elif user_input.startswith("open "):
            app_to_open=user_input[5:].strip()
            open_application(app_to_open)

        elif "tell me about the weather" in user_input.lower() or "what's the weather today" in user_input.lower():
            response=get_weather()
            print(Fore.GREEN + f"Zara: {response}")

        elif "what time is it" in user_input or "current time" in user_input:
            current_time=datetime.now().strftime("%H:%M:%S")
            print(Fore.GREEN + f"Zara: The current time is {current_time}")

        elif user_input.startswith(("play", "stream", "start", "broadcast")):
            command, song_info=user_input.split(maxsplit=1)
            if "on" in song_info:
                parts=list(map(str.strip, song_info.rsplit("on", 1)))
                if len(parts) == 2:
                    title, platform=parts
                    platform=platform.lower()

                    if platform == "spotify":
                        play_song_on_spotify(title)
                    elif platform == "youtube":
                        play_video_on_youtube(title)
                    else:
                        print(Fore.GREEN + f"Unsupported platform: {platform}. Supported platforms are 'spotify' and 'youtube'.")
                else:
                    print(Fore.GREEN + "Invalid command format. Please specify the song title and platform.")
            else:
                play_video_on_youtube(song_info)

        elif user_input.startswith("google "):
            query=user_input[7:]
            google_search(query)

        elif user_input.startswith("search "):
            query=user_input[7:].strip()
            handle_search(query)

        elif user_input.startswith("add "):
            task=user_input[4:].strip()
            add_to_todo(task)
            print(Fore.GREEN + f"Zara: Added to to-do list: {task}")

        elif user_input == "schedule":
            pending_tasks, completed_tasks=get_todo_list()
            print(Fore.GREEN + "Zara: Here are your pending tasks:")
            for index, task in enumerate(pending_tasks):
                print(Fore.GREEN + f"{index + 1}. {task}")

        elif user_input.startswith("delete "):
            try:
                task_index=int(user_input.split()[1]) - 1
                delete_task(task_index)
                print(Fore.GREEN + "Zara: Task deleted.")
            except (IndexError, ValueError):
                print(Fore.GREEN + "Invalid task index.")

        elif user_input.startswith("mark completed "):
            try:
                task_index=int(user_input.split()[2]) - 1
                mark_task_completed(task_index)
                print(Fore.GREEN + "Zara: Task marked as completed.")
            except (IndexError, ValueError):
                print(Fore.GREEN + "Invalid task index.")

        elif user_input == "news":
            fetch_news()

        else:
            response=get_response_with_prompt(user_input)
            print(Fore.GREEN + f"Zara: {response}")

    pygame.mixer.music.stop()

if __name__ == "__main__":
    main()

Instructions of use

sudo apt install -y espeak

create a .env file

edit config.py file!

python -m venv venv && source venv/bin/activate 

pip install -r requirements.txt

now give it access then run!

xhost +

python main.py

that xhost command needs to run before so after a restart run again its not permanent! (you can put in a bash script that then runs the py file aswell

Example startup script to automate this

#!/bin/bash
# (c) J~Net 2024
#
echo "Starting Zed"

python -m venv venv && source venv/bin/activate 

xhost +

python main.py

./start.sh

jamieduk commented 1 month ago

also heres a more advanced version that renders chat.py unneeded and added ollama support for local llm when it dont have the answer!

import pygame
from config import AUDIO_FILE_PATH, BACKGROUND_MUSIC_FILES
from audio import change_background_music, speechRecognition
from search import handle_search
from weather import get_weather
from utils import terminate_program, open_application, play_video_on_youtube, play_song_on_spotify, google_search, speak_response
from datetime import datetime
import time
import threading
from calendar_handler import initialize_calendar, add_event, list_events
from web_scraper import fetch_news
from whatsapp_handler import send_whatsapp_message
import sys
import random
import shutil
from colorama import Fore, init
from blessed import Terminal
from animation import init_animation
import ollama

# Initialize your Ollama object
ollama_model="crewai-tinyllama"  # Define the model to use

TODO_FILE="todo.txt"  # Todo file!

def get_response_with_prompt(prompt):
    # Example responses for demonstration purposes
    if "hi" in prompt.lower():
        #speak_response("Hello! How can I assist you today?")
        return "Hello! How can I assist you today?"
    elif "weather" in prompt.lower():
        #speak_response("I can provide the latest weather updates. What location are you interested in?")
        return "I can provide the latest weather updates. What location are you interested in?"
    elif "time" in prompt.lower():
        #speak_response("I can tell you the current time. Just ask!")
        return "I can tell you the current time. Just ask!"
    elif "music" in prompt.lower():
        #speak_response("What music would you like to play?")
        return "What music would you like to play?"
    else:
        try:
            # Correctly specify the arguments as keyword arguments
            return ollama.generate(prompt=prompt, model=ollama_model)
        except Exception as e:
            return f"Error occurred while processing your request: {str(e)}"

def get_todo_list():
    # Assuming this function retrieves tasks from a database or a file.
    # Ensure that tasks are stored as complete strings in the data source.

    pending_tasks=[]  # Replace with logic to get pending tasks from storage
    completed_tasks=[]  # Replace with logic to get completed tasks from storage

    # Example: if using a text file
    with open("todo.txt", "r") as file:
        for line in file:
            line=line.strip()
            if line:
                pending_tasks.append(line)  # Store as full lines

    return pending_tasks, completed_tasks

def process_input():
    while True:
        user_input=input("You: ").strip()

        # Check for termination commands
        if any(command in user_input.lower() for command in ["quit", "exit", "sleep", "deactivate"]):
            terminate_program()
            break

        # Command: help
        elif user_input == "help":
            handle_help_command()

        # Command: switch to persona
        elif user_input.startswith("switch to "):
            persona_name=user_input.split("switch to ")[1].strip().title()
            switch_persona(persona_name)
            print(Fore.GREEN + f"Zara: Switched to {persona_name}.")

        # Command: open application
        elif user_input.startswith("open "):
            app_to_open=user_input[5:].strip()
            open_application(app_to_open)

        # Command: weather
        elif "tell me about the weather" in user_input.lower() or "what's the weather today" in user_input.lower():
            response=get_weather()
            speak_response(response)
            print(Fore.GREEN + f"Zara: {response}")

        # Command: current time
        elif user_input.startswith("what time is it") or "current time" in user_input:
            current_time=datetime.now().strftime("%H:%M:%S")
            speak_response(current_time)
            print(Fore.GREEN + f"Zara: The current time is {current_time}")

        # Command: play music or video
        elif user_input.startswith(("play", "stream", "start", "broadcast")):
            command, song_info=user_input.split(maxsplit=1)
            if "on" in song_info:
                parts=list(map(str.strip, song_info.rsplit("on", 1)))
                if len(parts) == 2:
                    title, platform=parts
                    platform=platform.lower()

                    if platform == "spotify":
                        play_song_on_spotify(title)
                    elif platform == "youtube":
                        play_video_on_youtube(title)
                    else:
                        print(Fore.GREEN + f"Unsupported platform: {platform}. Supported platforms are 'spotify' and 'youtube'.")
                else:
                    print(Fore.GREEN + "Invalid command format. Please specify the song title and platform.")
            else:
                play_video_on_youtube(song_info)

        # Command: Google search
        elif user_input.startswith("google "):
            query=user_input[7:]
            google_search(query)

        # Command: general search
        elif user_input.startswith("search "):
            query=user_input[7:].strip()
            handle_search(query)

        # Command: add task
        elif user_input.startswith("add "):
            task=user_input[4:].strip()
            add_to_todo(task)
            print(Fore.GREEN + f"Zara: Added to to-do list: {task}")

        # Command: display schedule
        elif user_input == "schedule":
            pending_tasks, completed_tasks=get_todo_list()
            print(Fore.GREEN + "Zara: Here are your pending tasks:")
            for index, task in enumerate(pending_tasks):
                speak_response(task)
                print(Fore.GREEN + f"{index + 1}. {task}")

        # Command: delete task
        elif user_input.startswith("delete "):
            try:
                task_index=int(user_input.split("delete ")[1]) - 1
                delete_task(task_index)
                speak_response("Task Deleted!")
                print(Fore.GREEN + f"Zara: Deleted task {task_index + 1}.")
            except ValueError:
                speak_response("Please specify a valid task number")
                print(Fore.GREEN + "Zara: Please specify a valid task number.")

        # Command: mark task completed
        elif user_input.startswith("mark completed "):
            try:
                task_index=int(user_input.split("mark completed ")[1]) - 1
                mark_task_completed(task_index)
                speak_response("Marked task as completed")
                print(Fore.GREEN + f"Zara: Marked task {task_index + 1} as completed.")
            except ValueError:
                speak_response("Please specify a valid task number")
                print(Fore.GREEN + "Zara: Please specify a valid task number.")

        # Command: get news
        elif user_input == "news":
            news_articles=fetch_news()
            speak_response("Here are the latest news articles")
            print(Fore.GREEN + "Zara: Here are the latest news articles:")
            for article in news_articles:
                speak_response(article)
                print(Fore.GREEN + f"- {article}")

        # Command: send WhatsApp message
        elif user_input.startswith("send "):
            try:
                msg_parts=user_input.split(" to ")
                message=msg_parts[0][5:]
                contact=msg_parts[1]
                send_whatsapp_message(message, contact)
                speak_response("Message Sent!")
                print(Fore.GREEN + f"Zara: Message '{message}' sent to {contact}.")
            except IndexError:
                speak_response("Please specify a message and a contact")
                print(Fore.GREEN + "Zara: Please specify a message and a contact.")

        # If none of the above commands match, respond to the input as a query
        else:
            response=get_response_with_prompt(user_input)  # Pass input to response function
            if isinstance(response, dict):
                response_text=response.get('response', 'Sorry, I didn\'t understand that.')
            else:
                response_text=response  # Handle response as a plain string
            speak_response(response_text)
            print(Fore.GREEN + f"Zara: {response_text}")

def play_music(file_path):
    pygame.mixer.music.load(file_path)
    pygame.mixer.music.play(-1)
    pygame.mixer.music.set_volume(.1)

def pause_music():
    if pygame.mixer.music.get_busy():
        pygame.mixer.music.pause()

def unpause_music():
    if not pygame.mixer.music.get_busy():
        pygame.mixer.music.unpause()

def change_music():
    change_background_music(BACKGROUND_MUSIC_FILES, pygame.mixer.music)

class NullWriter(object):
    def write(self, arg):
        pass

sys.stdout=NullWriter()
sys.stdout=sys.__stdout__
init(autoreset=True)
pygame.mixer.init()

ascii_art="""
 #######    ##       ####     ##              ##   ##    #####    ####     ##### 
      ##   ####     ##  ##   ####             ##   ##   ##       ##  ##   ##     
     ##    ## ##   ##   ##   ## ##            ##   ##  ##       ##   ##  ##      
    ##    ##   ##  ######   ##   ##           #######  ######   ######   ######  
   ##     #######  ####     #######           ##   ##  ##       ####     ##      
  ##      ##   ##  ## ##    ##   ##           ##   ##  ##       ## ##    ##      
 #######  ##   ##  ##  ##   ##   ##           ##   ##  #######  ##  ##   ####### 
"""

def show_loading_animation():
    print(ascii_art)
    print("ZARA: Hello there!")
    print("\n")

def handle_help_command():
    print(Fore.GREEN + "\nAvailable Commands:")
    print(Fore.GREEN + "    - help: Display this help message")
    print(Fore.GREEN + "    - open [app_name]: Open a specified application")
    print(Fore.GREEN + "      Example: 'open Chrome'")
    print(Fore.GREEN + "    - play music: Play relaxing music on YouTube")
    print(Fore.GREEN + "    - weather: Get the current weather")
    print(Fore.GREEN + "    - add [task]: Add a task to the to-do list")
    print(Fore.GREEN + "      Example: 'add Complete homework on Math'")
    print(Fore.GREEN + "    - schedule: Display today's schedule")
    print(Fore.GREEN + "    - send [message] to [contact]: Send a WhatsApp message")
    print(Fore.GREEN + "    - news: Get the latest news")
    print(Fore.GREEN + "    - quit: Exit the program")

def switch_persona(name):
    # Placeholder for persona switch functionality
    print(Fore.GREEN + f"Switched to {name}.")

def delete_task(task_index):
    # Implement logic to delete a task from the to-do list.
    tasks=get_todo_list()
    if 0 <= task_index < len(tasks[0]):
        del tasks[0][task_index]  # Assuming pending tasks are at index 0
        save_todo_list(tasks)  # Save the updated tasks back to file or database
        print(Fore.GREEN + f"Deleted task {task_index + 1}.")
    else:
        print(Fore.GREEN + "Task index out of range.")

def mark_task_completed(task_index):
    # Implement logic to mark a task as completed.
    tasks=get_todo_list()
    if 0 <= task_index < len(tasks[0]):
        completed_task=tasks[0][task_index]
        tasks[0].remove(completed_task)
        tasks[1].append(completed_task)  # Assuming completed tasks are at index 1
        save_todo_list(tasks)  # Save the updated tasks back to file or database
        print(Fore.GREEN + f"Marked task {task_index + 1} as completed.")
    else:
        print(Fore.GREEN + "Task index out of range.")

def save_todo_list(tasks):
    # Implement logic to save the updated task list to a file.
    with open("todo.txt", "w") as file:
        for task in tasks[0]:
            file.write(f"{task}\n")
        for task in tasks[1]:
            file.write(f"{task}\n")  # Save completed tasks if needed

def remind_todo():
    while True:
        time.sleep(60 * 30)
        tasks=get_todo_list()
        if tasks:
            pending_tasks="\n".join(f"- {task}" for task in tasks)
            speak_response("You have pending tasks in your to-do list.")
            print(Fore.GREEN + f"Zara: You have pending tasks:\n{pending_tasks}")

if __name__ == "__main__":
    threading.Thread(target=remind_todo, daemon=True).start()
    is_music_playing=False
    play_music(AUDIO_FILE_PATH)
    is_music_playing=True

    show_loading_animation()
    process_input()