EloiStree / 2024_05_23_HelloStreamDeckGirleek

In the context of Girleek and a QA testing workshop. We are going to learn how to remote control game with code in a steam deck hub way.
0 stars 0 forks source link

Flush: Du code en préparation d'un exercice non utilisé #40

Open EloiStree opened 1 month ago

EloiStree commented 1 month ago

( Et sans scratch ça ressemble à quoi ?)

Envoyé un entier via UDP

Codes here: https://github.com/EloiStree/2024_05_17_BasicPythonUdpWebsocketIID/tree/main/SendUdpInteger

Envoyé un integer via UDP en byte au server local créé plus tôt.

import socket
import struct

UDP_IP = "127.0.0.1"
UDP_PORT = 404

def fetch_integer():
    user_input = input("Enter an integer: ")
    try:
        integer_value = int(user_input)
        push_integer(integer_value)
    except ValueError:
        ignore =  0

#####################################
# Define the target IP and port
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def push_integer(integer_value):
    packed_data = struct.pack('<i', integer_value)
    sock.sendto(packed_data, (UDP_IP, UDP_PORT))
    print(f"Sent integer {integer_value} to {UDP_IP}:{UDP_PORT}")

if __name__ == "__main__":
    while True:
       fetch_integer()

Envoyé un entier via Websocket

Codes here: https://github.com/EloiStree/2024_05_17_BasicPythonUdpWebsocketIID/tree/main/SendWebsocketInteger


import asyncio
import websockets
import random
import time
uri = "ws://localhost:7073"

min_int = -100
max_int = 100
min_sleep = 4
max_sleep = 10

async def send_random_integers(uri):
    async with websockets.connect(uri) as websocket:
        while True:

            random_integer = random.randint(min_int, max_int)
            integer_as_byte = random_integer.to_bytes(4, 'little', signed=True)
            await websocket.send(integer_as_byte)
            print(f"Sent: {random_integer}")

            await asyncio.sleep(random.uniform(min_sleep, max_sleep))

asyncio.get_event_loop().run_until_complete(send_random_integers(uri))

Example de lecture et écriture

List de lecture converti en entier: https://github.com/EloiStree/2024_05_14_IntegerIndexDateToRemoteControl/tree/main/Python

Envoyé un clavier en integer

Source: https://github.com/EloiStree/2024_05_14_IntegerIndexDateToRemoteControl/blob/main/Python/Device2Gate/PushKeyboardAsInteger.py


import socket
import random
import time
import keyboard
port = 3614  # UDP port number
time_wait = 1  # Time to wait before closing the socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

dico_previous_state = {}

#use_debug_info = True
use_debug_info = False

dico_to_int_up={}
dico_to_int_down={}
if use_debug_info:
    dico_to_int_up = { 'up': 301, 'down': 302, 'left': 303, 'right': 304,'space': 305,
                    'a': 311, 'z': 312, 'e': 313, 'r': 314,'t': 315,
                    'q': 316, 's': 317, 'd': 318, 'f': 319,
                    }
    dico_to_int_down = { 'up': 401, 'down': 402, 'left': 403, 'right': 404,'space': 405,
                        'a': 411, 'z': 412, 'e': 413, 'r': 414,'t': 415,
                        'q': 416, 's': 417, 'd': 418, 'f': 419
                        }
else:
    dico_to_int_up = { 'up': 101, 'down': 102, 'left': 103, 'right': 104,'space': 105,
                    'a': 111, 'z': 112, 'e': 113, 'r': 114,'t': 115,
                    'q': 116, 's': 117, 'd': 118, 'f': 119,
                    }
    dico_to_int_down = { 'up': 201, 'down': 202, 'left': 203, 'right': 204,'space': 205,
                        'a': 211, 'z': 212, 'e': 213, 'r': 214,'t': 215,
                        'q': 216, 's': 217, 'd': 218, 'f': 219
                        }

def push_bytes_on_udp_port( int_value):
        data = int_value.to_bytes(4, 'little')
        sock.sendto(data, ('localhost', port))

def on_arrow_press(string_key):    

        if  string_key not in dico_previous_state or dico_previous_state[string_key]==0:
            dico_previous_state[string_key] = 1
            print(f"Arrow {string_key} key pressed{dico_to_int_up[string_key]}")
            push_bytes_on_udp_port(dico_to_int_up[string_key])

def on_arrow_release(string_key):

        if string_key not in dico_previous_state or dico_previous_state[string_key]==1:
            dico_previous_state[string_key] = 0
            print(f"Arrow {string_key} key released {dico_to_int_down[string_key]}")
            push_bytes_on_udp_port(dico_to_int_down[string_key])

while True:

    for key, value in dico_to_int_up.items():
        if keyboard.is_pressed(key):
            on_arrow_press(key)
    for key, value in dico_to_int_down.items():
        if not keyboard.is_pressed(key):
            on_arrow_release(key)
    time.sleep(0.0001)

Faire bouger 4 manettes Xbox en meme temps

import pyxinput
import time
import socket
import struct
import random

#pyxinput.test_virtual()

 # AxisLx          , Left Stick X-Axis
 # AxisLy          , Left Stick Y-Axis
 # AxisRx          , Right Stick X-Axis
 # AxisRy          , Right Stick Y-Axis
 # BtnBack         , Menu/Back Button
 # BtnStart        , Start Button
 # BtnA            , A Button
 # BtnB            , B Button
 # BtnX            , X Button
 # BtnY            , Y Button
 # BtnThumbL       , Left Thumbstick Click
 # BtnThumbR       , Right Thumbstick Click
 # BtnShoulderL    , Left Shoulder Button
 # BtnShoulderR    , Right Shoulder Button
 # Dpad            , Set Dpad Value (0 = Off, Use DPAD_### Constants) 1 up 2 down 3 left 4 right
 # TriggerL        , Left Trigger
 # TriggerR        , Right Trigger

listener_udp_address = ('', 7000)

int_offset=450000
int_player_offset=100

max_input=3

pv={}
pr={}

def set_value_for_all(value_key,value):
    for key in pv:
        pv[key].set_value(value_key, value)

for i in range(1,max_input+1):
    pv[str(i)] = pyxinput.vController()
    pr[str(i)] = pyxinput.rController(i)

def release_all():
        set_value_for_all('BtnA', 0),
        set_value_for_all('BtnB', 0),
        set_value_for_all('BtnX', 0),
        set_value_for_all('BtnY', 0),
        set_value_for_all('Dpad', 0),
        set_value_for_all('BtnThumbL', 0),
        set_value_for_all('BtnThumbR', 0),
        set_value_for_all('BtnShoulderL', 0),
        set_value_for_all('BtnShoulderR', 0),
        set_value_for_all('BtnStart', 0),
        set_value_for_all('BtnBack', 0),
        set_value_for_all('TriggerL', 0),
        set_value_for_all('TriggerR', 0),
        set_value_for_all('AxisLx', 0),
        set_value_for_all('AxisLy', 0),
        set_value_for_all('AxisRx', 0),
        set_value_for_all('AxisRy', 0),

def get_random_11():
    return random.uniform(-1.0, 1.0)

def all_player_move_randomly():
    for key in pv:
        pv[key].set_value('AxisLx', get_random_11())
        pv[key].set_value('AxisLy', get_random_11())
        pv[key].set_value('AxisRx', get_random_11())
        pv[key].set_value('AxisRy', get_random_11())

button_dico_all = {
    #ABXY
    0: lambda: release_all(),
    1: lambda: all_player_move_randomly(),
    1001: lambda: set_value_for_all('BtnA', 1),
    2001: lambda: set_value_for_all('BtnA', 0),
    1002: lambda: set_value_for_all('BtnB', 1),
    2002: lambda: set_value_for_all('BtnB', 0),
    1003: lambda: set_value_for_all('BtnX', 1),
    2003: lambda: set_value_for_all('BtnX', 0),
    1004: lambda: set_value_for_all('BtnY', 1),
    2004: lambda: set_value_for_all('BtnY', 0),
    #DPAD
    1005: lambda: set_value_for_all('Dpad', 2),# 2 down 
    2005: lambda: set_value_for_all('Dpad', 0),
    1006: lambda: set_value_for_all('Dpad', 1),# 1 up 
    2006: lambda: set_value_for_all('Dpad', 0),
    1007: lambda: set_value_for_all('Dpad', 4),#  4 left 
    2007: lambda: set_value_for_all('Dpad', 0),
    1008: lambda: set_value_for_all('Dpad', 8),# 8 right
    2008: lambda: set_value_for_all('Dpad', 0),

    #THUMB 
    1009: lambda: set_value_for_all('BtnThumbL', 1),
    2009: lambda: set_value_for_all('BtnThumbL', 0),
    1010: lambda: set_value_for_all('BtnThumbR', 1),
    2010: lambda: set_value_for_all('BtnThumbR', 0),

    #SHOULDER    
    1011: lambda: set_value_for_all('BtnShoulderL', 1),
    2011: lambda: set_value_for_all('BtnShoulderL', 0),
    1012: lambda: set_value_for_all('BtnShoulderR', 1),
    2012: lambda: set_value_for_all('BtnShoulderR', 0),

    #START BACK
    1013: lambda: set_value_for_all('BtnStart', 1),
    2013: lambda: set_value_for_all('BtnStart', 0),
    1014: lambda: set_value_for_all('BtnBack', 1),
    2014: lambda: set_value_for_all('BtnBack', 0),

    #TRIGGER
    2020: lambda: set_value_for_all('TriggerL', 0),
    2021: lambda: set_value_for_all('TriggerL', 0.1),
    2022: lambda: set_value_for_all('TriggerL', 0.2),
    2023: lambda: set_value_for_all('TriggerL', 0.3),
    2024: lambda: set_value_for_all('TriggerL', 0.4),
    2025: lambda: set_value_for_all('TriggerL', 0.5),
    2026: lambda: set_value_for_all('TriggerL', 0.6),
    2027: lambda: set_value_for_all('TriggerL', 0.7),
    2028: lambda: set_value_for_all('TriggerL', 0.8),
    2029: lambda: set_value_for_all('TriggerL', 0.9),
    2030: lambda: set_value_for_all('TriggerL', 1),

    #TRIGGER
    2040: lambda: set_value_for_all('TriggerR', 0),
    2041: lambda: set_value_for_all('TriggerR', 0.1),
    2042: lambda: set_value_for_all('TriggerR', 0.2),
    2043: lambda: set_value_for_all('TriggerR', 0.3),
    2044: lambda: set_value_for_all('TriggerR', 0.4),
    2045: lambda: set_value_for_all('TriggerR', 0.5),
    2046: lambda: set_value_for_all('TriggerR', 0.6),
    2047: lambda: set_value_for_all('TriggerR', 0.7),
    2048: lambda: set_value_for_all('TriggerR', 0.8),
    2049: lambda: set_value_for_all('TriggerR', 0.9),
    2050: lambda: set_value_for_all('TriggerR', 1),

    3100: lambda: set_value_for_all('AxisLx', -1),
    3101: lambda: set_value_for_all('AxisLx', -0.90),
    3102: lambda: set_value_for_all('AxisLx', -0.75),
    3103: lambda: set_value_for_all('AxisLx', -0.5),
    3104: lambda: set_value_for_all('AxisLx', -0.25),
    3105: lambda: set_value_for_all('AxisLx', 0),
    3106: lambda: set_value_for_all('AxisLx', 0.25),
    3107: lambda: set_value_for_all('AxisLx', 0.5),
    3108: lambda: set_value_for_all('AxisLx', 0.75),
    3109: lambda: set_value_for_all('AxisLx', 0.90),
    3110: lambda: set_value_for_all('AxisLx', 1),
    3111: lambda: set_value_for_all('AxisLy', get_random_11()),

    3200: lambda: set_value_for_all('AxisLy', -1),
    3201: lambda: set_value_for_all('AxisLy', -0.90),
    3202: lambda: set_value_for_all('AxisLy', -0.75),
    3203: lambda: set_value_for_all('AxisLy', -0.5),
    3204: lambda: set_value_for_all('AxisLy', -0.25),
    3205: lambda: set_value_for_all('AxisLy', 0),
    3206: lambda: set_value_for_all('AxisLy', 0.25),
    3207: lambda: set_value_for_all('AxisLy', 0.5),
    3208: lambda: set_value_for_all('AxisLy', 0.75),
    3209: lambda: set_value_for_all('AxisLy', 0.90),
    3210: lambda: set_value_for_all('AxisLy', 1),
    3211: lambda: set_value_for_all('AxisLy', get_random_11()),

    3300: lambda: set_value_for_all('AxisRx', -1),
    3301: lambda: set_value_for_all('AxisRx', -0.90),
    3302: lambda: set_value_for_all('AxisRx', -0.75),
    3303: lambda: set_value_for_all('AxisRx', -0.5),
    3304: lambda: set_value_for_all('AxisRx', -0.25),
    3305: lambda: set_value_for_all('AxisRx', 0),
    3306: lambda: set_value_for_all('AxisRx', 0.25),
    3307: lambda: set_value_for_all('AxisRx', 0.5),
    3308: lambda: set_value_for_all('AxisRx', 0.75),
    3309: lambda: set_value_for_all('AxisRx', 0.90),
    3310: lambda: set_value_for_all('AxisRx', 1),
    3311: lambda: set_value_for_all('AxisLy', get_random_11()),

    3400: lambda: set_value_for_all('AxisRy', -1),
    3401: lambda: set_value_for_all('AxisRy', -0.90),
    3402: lambda: set_value_for_all('AxisRy', -0.75),
    3403: lambda: set_value_for_all('AxisRy', -0.5),
    3404: lambda: set_value_for_all('AxisRy', -0.25),
    3405: lambda: set_value_for_all('AxisRy', 0),
    3406: lambda: set_value_for_all('AxisRy', 0.25),
    3407: lambda: set_value_for_all('AxisRy', 0.5),
    3408: lambda: set_value_for_all('AxisRy', 0.75),
    3409: lambda: set_value_for_all('AxisRy', 0.90),
    3410: lambda: set_value_for_all('AxisRy', 1),
    3411: lambda: set_value_for_all('AxisLy', get_random_11()),

    }

# Create a UDP socket

# Bind the socket to a specific IP address and port
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(listener_udp_address)

def integer_to_xbox_input(int_value):
    if button_dico_all.get(int_value) is not None:
        print(f"Received {int_value}")
        button_dico_all[int_value]()
    else :
        print(f"Received {int_value} not found")

print(f"Listening for incoming data... port {listener_udp_address[1]}")
# Listen for incoming data
while True:
    byte_received, address = sock.recvfrom(1024)  # Adjust the buffer size as needed
    print(f'Received {len(byte_received)} bytes from {address}: {byte_received}')
    if byte_received is not None:
        if len(byte_received) == 16:
            index = struct.unpack('<i', byte_received[0:4])[0]
            value = struct.unpack('<i', byte_received[4:8])[0]
            ulong_milliseconds = struct.unpack('<q', byte_received[8:16])[0]
            print(f"Received Bytes {index} | {value} | { ulong_milliseconds}")
            integer_to_xbox_input(value)

Que faire maintenant avec un entier localement ?

Si votre but est des faires des actions en python sur le PC, vous pouvez

Via le script de server donné

Via un message UDP local

C'est bien beau de tout mettre dans le meme fichier mais ça devient vite brouillons.

Il est plus simple de faire un script une actions.

Pour cela, il vous faudra créé un écouteur UDP.

Créé un fichier DefaultUdpListener.py

Last version: https://github.com/EloiStree/2024_05_17_BasicPythonUdpWebsocketIID/blob/main/SendUdpInteger/ReceivedUdpInteger.py


def handle_integer_received(integer):
    value=int(integer)
    print(value)

import socket
import struct

def integer_listener_client_udp(host, port):
    # Create a UDP socket
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    # Bind the socket to the host and port
    udp_socket.bind((host, port))
    print("UDP integer listener client is now listening on {}:{}".format(host, port))

    try:
        while True:
            # Receive data from the socket
            data, address = udp_socket.recvfrom(64)
            if len(data)==4:
                received_integer = struct.unpack('<i', data)[0]
                handle_integer_received(received_integer)
            elif len(data)==12:
                received_integer = struct.unpack('<iQ', data)[0]
                handle_integer_received(received_integer)
            elif len(data)==16:
                received_integer = struct.unpack('<iiQ', data)[0]
                handle_integer_received(received_integer)
            else: continue
            #print("Received integer:", received_integer, "from", address)

    except KeyboardInterrupt:
        print("UDP integer listener client stopped.")
    finally:
        # Close the socket
        udp_socket.close()

# Example usage:
if __name__ == "__main__":
    HOST = "0.0.0.0"   # Listen on all available interfaces
    PORT = 404       # Example port (change to your desired port)
    integer_listener_client_udp(HOST, PORT)