noxrepo / pox

The POX network software platform
https://noxrepo.github.io/pox-doc/html/
Apache License 2.0
628 stars 473 forks source link

Use more-clever cookie for CookieGuard #245

Open MurphyMc opened 4 years ago

MurphyMc commented 4 years ago

POX CookieGuard currently just uses a single secret. While PCG is not really meant as a generic authentication mechanism, we could pretty easily limit the secret to a given client IP address. Especially if the web server is running without encryption, there may be some benefit to this...

ljluestc commented 7 months ago

class CookieGuard (object):
  COOKIE_NAME = 'pox-cookie-guard'

  def __init__ (self):
    self.secrets = {}

  def set_secret (self, ip_address, secret):
    self.secrets[ip_address] = secret

  def get_secret (self, ip_address):
    return self.secrets.get(ip_address)

  def send_secret (self, connection, secret):
    # Send the secret to the client in a cookie
    connection.send_header('Set-Cookie', '{}={}; Path=/'.format(self.COOKIE_NAME, secret))

  def receive_secret (self, headers):
    # Get the cookie from the headers
    cookie = headers.get('Cookie')
    if cookie:
        cookie_pairs = cookie.split(';')
        for pair in cookie_pairs:
            key, value = pair.strip().split('=')
            if key == self.COOKIE_NAME:
                return value
    return None

  def verify_secret (self, ip_address, secret):
    # Verify if the provided secret matches the stored secret for the client IP address
    stored_secret = self.get_secret(ip_address)
    if stored_secret and stored_secret == secret:
        return True
    return False

  def handle_request (self, connection, request):
    ip_address = connection.getpeername()[0]
    secret = self.receive_secret(request.headers)
    if secret:
        if self.verify_secret(ip_address, secret):
            connection.send_response(200)
            connection.end_headers()
            connection.wfile.write(b'Success: Authorized\n')
        else:
            connection.send_response(403)
            connection.end_headers()
            connection.wfile.write(b'Error: Forbidden\n')
    else:
        connection.send_response(401)
        connection.end_headers()
        connection.wfile.write(b'Error: Unauthorized\n')

  def handle_post (self, connection, request):
    ip_address = connection.getpeername()[0]
    content_length = int(request.headers.get('Content-Length', 0))
    post_data = request.rfile.read(content_length).decode('utf-8')
    secret = self.receive_secret(request.headers)
    if secret:
        if self.verify_secret(ip_address, secret):
            # Process the POST request
            connection.send_response(200)
            connection.end_headers()
            connection.wfile.write(b'Success: POST request processed\n')
        else:
            connection.send_response(403)
            connection.end_headers()
            connection.wfile.write(b'Error: Forbidden\n')
    else:
        connection.send_response(401)
        connection.end_headers()
        connection.wfile.write(b'Error: Unauthorized\n')