Open alfredowshh opened 9 years ago
Le code --> class SSLManager(object): """ Manages SSL certificates.
:param SoftLayer.API.Client client: an API client instance
"""
def __init__(self, client):
#: A valid `SoftLayer.API.Client` object that will be used for all
#: actions.
self.client = client
#: Reference to the SoftLayer_Security_Certificate API object.
self.ssl = self.client['Security_Certificate']
def list_certs(self, method='all'):
""" List all certificates.
:param string method: The type of certificates to list. Options are
'all', 'expired', and 'valid'.
:returns: A list of dictionaries representing the requested SSL certs.
"""
ssl = self.client['Account']
methods = {
'all': 'getSecurityCertificates',
'expired': 'getExpiredSecurityCertificates',
'valid': 'getValidSecurityCertificates'
}
mask = "mask[id, commonName, validityDays, notes]"
func = getattr(ssl, methods[method])
return func(mask=mask)
def add_certificate(self, certificate):
""" Creates a new certificate.
:param dict certificate: A dictionary representing the parts of the
certificate. See SLDN for more information.
"""
return self.ssl.createObject(certificate)
def remove_certificate(self, id):
""" Removes a certificate.
:param integer id: a certificate ID to remove
"""
return self.ssl.deleteObject(id=id)
def edit_certificate(self, certificate):
""" Updates a certificate with the included options.
The provided dict must include an 'id' key and value corresponding to
the certificate ID that should be updated.
:param dict certificate: the certificate to update.
"""
return self.ssl.editObject(certificate, id=certificate['id'])
def get_certificate(self, id):
""" Gets a certificate with the ID specified.
:param integer id: the certificate ID to retrieve
"""
return self.ssl.getObject(id=id)
r = requests.get('http://en.wikipedia.org/wiki/Monty_Python' r.headers {'content-length': '56170', 'x-content-type-options': 'nosniff', 'x-cache': 'HIT from cp1006.eqiad.wmnet, MISS from cp1010.eqiad.wmnet', 'content-encoding': 'gzip', 'age': '3080', 'content-language': 'en', 'vary': 'Accept-Encoding,Cookie', 'server': 'Apache', 'last-modified': 'Wed, 13 Jun 2012 01:33:50 GMT', 'connection': 'close', 'cache-control': 'private, s-maxage=0, max-age=0, must-revalidate', 'date': 'Thu, 14 Jun 2012 12:59:39 GMT', 'content-type': 'text/html; charset=UTF-8', 'x-cache-lookup': 'HIT from cp1006.eqiad.wmnet:3128, MISS from cp1010.eqiad.wmnet:80'} r.request.headers {'Accept-Encoding': 'identity, deflate, compress, gzip', 'Accept': '/', 'User-Agent': 'python-requests/1.2.0'} requests.get('https://kennethreitz.com', verify=True) requests.exceptions.SSLError: hostname 'kennethreitz.com' doesn't match either of '*.herokuapp.com', 'herokuapp.com' requests.get('https://github.com', verify=True) <Response [200]> requests.get('https://kennethreitz.com', verify=False) <Response [200]> requests.get('https://kennethreitz.com', cert=('/path/server.crt', '/path/key')) <Response [200]> requests.get('https://kennethreitz.com', cert='/wrong_path/server.pem') SSLError: [Errno 336265225] _ssl.c:347: error:140B0009:SSL routines:SSL_CTX_use_PrivateKey_file:PEM lib
!/usr/bin/env python
-- coding: utf-8 --
_author = "Adrien Pujol - http://www.crashdump.fr/" copyright = "Copyright 2013, Adrien Pujol" license = "Mozilla Public License" version = "0.3" email = "adrien.pujol@crashdump.fr" status = "Development" doc = "Check a TLS certificate validity." import argparse import socket from datetime import datetime import time try:
Try to load pyOpenSSL first
# aptitude install python-dev && pip install pyopenssl from OpenSSL import SSL PYOPENSSL = True
except ImportError:
Else, fallback on standard ssl lib (doesn't support SNI)
import ssl PYOPENSSL = False
CA_CERTS = "/etc/ssl/certs/ca-certificates.crt" def exit_error(errcode, errtext): print errtext exit(errcode) def pyopenssl_check_callback(connection, x509, errnum, errdepth, ok): ''' callback for pyopenssl ssl check''' if x509.get_subject().commonName == HOST: if x509.has_expired(): exit_error(1, 'Error: Certificate has expired!') else: print pyopenssl_check_expiration(x509.get_notAfter()) if not ok: return False return ok def pyopenssl_check_expiration(asn1): ''' Return the numbers of day before expiration. False if expired.''' try: expire_date = datetime.strptime(asn1, "%Y%m%d%H%M%SZ") except: exit_error(1, 'Certificate date format unknow.')
expire_in = expire_date - datetime.now()
if expire_in.days > 0:
return expire_in.days
else:
return False
def pyssl_check_hostname(cert, hostname): ''' Return True if valid. False is invalid ''' if 'subjectAltName' in cert: for typ, val in cert['subjectAltName']:
if typ == 'DNS' and val.startswith('*'):
if val[2:] == hostname.split('.', 1)[1]:
return True
# Normal hostnames
elif typ == 'DNS' and val == hostname:
return True
else:
return False
def pyssl_check_expiration(cert): ''' Return the numbers of day before expiration. False if expired. ''' if 'notAfter' in cert: try: expire_date = datetime.strptime(cert['notAfter'], "%b %d %H:%M:%S %Y %Z") except: exit_error(1, 'Certificate date format unknow.') expire_in = expire_date - datetime.now() if expire_in.days > 0: return expire_in.days else: return False def main(): parser = argparse.ArgumentParser() parser.add_argument('host', help='specify an host to connect to') parser.add_argument('-p', '--port', help='specify a port to connect to', type=int, default=443) args = parser.parse_args()
global HOST, PORT
HOST = args.host
PORT = args.port
try:
socket.getaddrinfo(HOST, PORT)[0][4][0]
except socket.gaierror as e:
exit_error(1, e)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((HOST, PORT)) if not PYOPENSSL: try: ssl_sock = ssl.wrap_socket(sock, cert_reqs=ssl.CERT_REQUIRED, ca_certs=CA_CERTS, ciphers=("HIGH:-aNULL:-eNULL:" "-PSK:RC4-SHA:RC4-MD5"))
cert = ssl_sock.getpeercert()
if not pyssl_check_hostname(cert, HOST):
print 'Error: Hostname does not match!'
print pyssl_check_expiration(cert)
sock = ssl_sock.unwrap()
except ssl.SSLError as e:
exit_error(1, e)
# If handled by pyOpenSSL module
else:
try:
ctx = SSL.Context(SSL.TLSv1_METHOD)
ctx.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
pyopenssl_check_callback)
ctx.load_verify_locations(CA_CERTS)
ssl_sock = SSL.Connection(ctx, sock)
ssl_sock.set_connect_state()
ssl_sock.set_tlsext_host_name(HOST)
ssl_sock.do_handshake()
x509 = ssl_sock.get_peer_certificate()
x509name = x509.get_subject()
if x509name.commonName != HOST:
print 'Error: Hostname does not match!'
ssl_sock.shutdown()
except SSL.Error as e:
exit_error(1, e)
sock.close()
if name == "main": main()
context = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED context.check_hostname = True context.load_verify_locations("/etc/ssl/certs/ca-bundle.crt") conn = context.wrap_socket(socket.socket(socket.AF_INET), ... server_hostname="www.python.org") conn.connect(("www.python.org", 443)) cert = conn.getpeercert() pprint.pprint(cert) {'OCSP': ('http://ocsp.digicert.com',), 'caIssuers': ('http://cacerts.digicert.com/DigiCertSHA2ExtendedValidationServerCA.crt',), 'crlDistributionPoints': ('http://crl3.digicert.com/sha2-ev-server-g1.crl', 'http://crl4.digicert.com/sha2-ev-server-g1.crl'), 'issuer': ((('countryName', 'US'),), (('organizationName', 'DigiCert Inc'),), (('organizationalUnitName', 'www.digicert.com'),), (('commonName', 'DigiCert SHA2 Extended Validation Server CA'),)), 'notAfter': 'Sep 9 12:00:00 2016 GMT', 'notBefore': 'Sep 5 00:00:00 2014 GMT', 'serialNumber': '01BB6F00122B177F36CAB49CEA8B6B26', 'subject': ((('businessCategory', 'Private Organization'),), (('1.3.6.1.4.1.311.60.2.1.3', 'US'),), (('1.3.6.1.4.1.311.60.2.1.2', 'Delaware'),), (('serialNumber', '3359300'),), (('streetAddress', '16 Allen Rd'),), (('postalCode', '03894-4801'),), (('countryName', 'US'),), (('stateOrProvinceName', 'NH'),), (('localityName', 'Wolfeboro,'),), (('organizationName', 'Python Software Foundation'),), (('commonName', 'www.python.org'),)), 'subjectAltName': (('DNS', 'www.python.org'), ('DNS', 'python.org'), ('DNS', 'pypi.python.org'), ('DNS', 'docs.python.org'), ('DNS', 'testpypi.python.org'), ('DNS', 'bugs.python.org'), ('DNS', 'wiki.python.org'), ('DNS', 'hg.python.org'), ('DNS', 'mail.python.org'), ('DNS', 'packaging.python.org'), ('DNS', 'pythonhosted.org'), ('DNS', 'www.pythonhosted.org'), ('DNS', 'test.pythonhosted.org'), ('DNS', 'us.pycon.org'), ('DNS', 'id.python.org')), 'version': 3} conn.sendall(b"HEAD / HTTP/1.0\r\nHost: linuxfr.org\r\n\r\n") pprint.pprint(conn.recv(1024).split(b"\r\n")) [b'HTTP/1.1 200 OK', b'Date: Sat, 18 Oct 2014 18:27:20 GMT', b'Server: nginx', b'Content-Type: text/html; charset=utf-8', b'X-Frame-Options: SAMEORIGIN', b'Content-Length: 45679', b'Accept-Ranges: bytes', b'Via: 1.1 varnish', b'Age: 2188', b'X-Served-By: cache-lcy1134-LCY', b'X-Cache: HIT', b'X-Cache-Hits: 11', b'Vary: Cookie', b'Strict-Transport-Security: max-age=63072000; includeSubDomains', b'Connection: close', b'', b''] -5- automatic certification validate % openssl req -new -x509 -days 365 -nodes -out cert.pem -keyout cert.pem Generating a 1024 bit RSA private key .......++++++ .............................++++++
writing new private key to 'cert.pem'
You are about to be asked to enter information that will be incorporated into your certificate request. What you are about to enter is what is called a Distinguished Name or a DN. There are quite a few fields but you can leave some blank For some fields there will be a default value,
If you enter '.', the field will be left blank.
Country Name (2 letter code) [AU]:US State or Province Name (full name) [Some-State]:MyState Locality Name (eg, city) []:Some City Organization Name (eg, company) [Internet Widgits Pty Ltd]:My Organization, Inc. Organizational Unit Name (eg, section) []:My Group Common Name (eg, YOUR name) []:myserver.mygroup.myorganization.com Email Address []:ops@myserver.mygroup.myorganization.com % pprint.pprint(cert) {'issuer': ((('organizationName', 'CAcert Inc.'),), (('organizationalUnitName', 'http://www.CAcert.org'),), (('commonName', 'CAcert Class 3 Root'),)), 'notAfter': 'Jun 7 21:02:24 2013 GMT', 'notBefore': 'Jun 8 21:02:24 2011 GMT', 'serialNumber': 'D3E9', 'subject': ((('commonName', 'linuxfr.org'),),), 'subjectAltName': (('DNS', 'linuxfr.org'), ('othername', '
'), ('DNS', 'linuxfr.org'), ('othername', ' '), ('DNS', 'dev.linuxfr.org'), ('othername', ' '), ('DNS', 'prod.linuxfr.org'), ('othername', ' '), ('DNS', 'alpha.linuxfr.org'), ('othername', ' '), ('DNS', '*.linuxfr.org'), ('othername', ' ')), 'version': 3} import socket, ssl
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) context.load_cert_chain(certfile="mycertfile", keyfile="mykeyfile")
bindsocket = socket.socket() bindsocket.bind(('myaddr.mydomain.com', 10023)) bindsocket.listen(5) while True: newsocket, fromaddr = bindsocket.accept() connstream = context.wrap_socket(newsocket, server_side=True) try: deal_with_client(connstream) finally: connstream.shutdown(socket.SHUT_RDWR) connstream.close() def deal_with_client(connstream): data = connstream.recv(1024)
while data:
if not do_something(connstream, data):
# we'll assume do_something returns False
# when we're finished with client
break
data = connstream.recv(1024)
# finished with client
import sys from twisted.web import xmlrpc, server from twisted.internet import reactor, ssl from twisted.python import log
def makeSSLContext(myKey,trustedCA): '''Returns an ssl Context Object @param myKey a pem formated key and certifcate with for my current host the other end of this connection must have the cert from the CA that signed this key @param trustedCA a pem formated certificat from a CA you trust you will only allow connections from clients signed by this CA and you will only allow connections to a server signed by this CA '''
# our goal in here is to make a SSLContext object to pass to connectSSL
# or listenSSL
# Why these functioins... Not sure...
fd = open(myKey,'r')
theCert = ssl.PrivateCertificate.loadPEM(fd.read())
fd.close()
fd = open(trustedCA,'r')
theCA = ssl.Certificate.loadPEM(fd.read())
fd.close()
ctx = theCert.OPTIONS(theCA)
# Now the options you can set look like Standard OpenSSL Library options
# The SSL protocol to use, one of SSLv23_METHOD, SSLv2_METHOD,
# SSLv3_METHOD, TLSv1_METHOD. Defaults to TLSv1_METHOD.
ctx.method = ssl.SSL.TLSv1_METHOD
# If True, verify certificates received from the peer and fail
# the handshake if verification fails. Otherwise, allow anonymous
# sessions and sessions with certificates which fail validation.
ctx.verify = True
# Depth in certificate chain down to which to verify.
ctx.verifyDepth = 1
# If True, do not allow anonymous sessions.
ctx.requireCertification = True
# If True, do not re-verify the certificate on session resumption.
ctx.verifyOnce = True
# If True, generate a new key whenever ephemeral DH parameters are used
# to prevent small subgroup attacks.
ctx.enableSingleUseKeys = True
# If True, set a session ID on each context. This allows a shortened
# handshake to be used when a known client reconnects.
ctx.enableSessions = True
# If True, enable various non-spec protocol fixes for broken
# SSL implementations.
ctx.fixBrokenPeers = False
return ctx
class Example(xmlrpc.XMLRPC): """An example object to be published. see: http://twistedmatrix.com/projects/web/documentation/howto/xmlrpc.html """
def xmlrpc_echo(self, x):
"""Return all passed args."""
log.msg('xmlrpc call echo, %s'%x)
return x
class Proxy(xmlrpc.Proxy): ''' See: http://twistedmatrix.com/projects/web/documentation/howto/xmlrpc.html this is eacly like the xmlrpc.Proxy included in twisted but you can give it a SSLContext object insted of just accepting the defaults.. ''' def setSSLClientContext(self,SSLClientContext): self.SSLClientContext = SSLClientContext def callRemote(self, method, *args): factory = xmlrpc._QueryFactory( self.path, self.host, method, self.user, self.password, self.allowNone, args) if self.secure: from twisted.internet import ssl try: self.SSLClientContext except NameError: print "Must Set a SSL Context" print "use self.setSSLClientContext() first"
# verfication of who your talking to
# Using the default sslcontext without verification
# Can lead to man in the middle attacks
reactor.connectSSL(self.host, self.port or 443,
factory,self.SSLClientContext)
else:
reactor.connectTCP(self.host, self.port or 80, factory)
return factory.deferred
def printValue(value): print repr(value) reactor.stop()
def printError(error): print 'error', error reactor.stop()
if name == 'main':
# documents
print "running as", sys.argv[1]
if sys.argv[1] == 'server':
log.startLogging(sys.stdout)
ctx = makeSSLContext(myKey='server.pem',trustedCA='cacert.pem')
r = Example()
reactor.listenSSL(7080, server.Site(r),ctx)
reactor.run()
elif sys.argv[1] == 'client':
ctx = makeSSLContext(myKey='client.pem', trustedCA='cacert.pem')
proxy = Proxy('https://localhost:7080/')
proxy.setSSLClientContext(ctx)
proxy.callRemote('echo',
'hello world').addCallbacks(printValue, printError)
reactor.run()
datastore_record_k = ndb.Key('Employee', 'asalieri', 'Address', 1) datastore_record = datastore_record_k.get() key_str = datastore_record.key_str cert_str = datastore_record.cert ssl_server = ssl.wrap_socket(server_sock, server_side=False, keyfile=StringIO.StringIO(key_str), certfile=StringIO.StringIO(cert_str), cert_reqs=ssl.CERT_REQUIRED, ssl_version=ssl.PROTOCOL_TLSv1, ca_certs=CERTIFICATE_FILE)
le code