-
Notifications
You must be signed in to change notification settings - Fork 2
/
_sslcerts.py
191 lines (152 loc) · 7.06 KB
/
_sslcerts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import uuid
from contextlib import closing
from java.io import BufferedInputStream, BufferedReader, FileReader
from java.security import KeyPair, KeyStore, Security
from java.security.cert import CertificateException, CertificateFactory, X509Certificate
from javax.net.ssl import X509KeyManager, X509TrustManager, KeyManagerFactory, SSLContext, TrustManagerFactory
#from org.bouncycastle.openssl import PEMParser # FIXME PEMReader is deprecated, but need to figure out jar specifics
from org.bouncycastle.openssl import PEMReader
from org.bouncycastle.jce.provider import BouncyCastleProvider
# FIXME what happens if reloaded?
Security.addProvider(BouncyCastleProvider())
# build the necessary certificate with a CertificateFactory; this can take the pem format:
# http://docs.oracle.com/javase/7/docs/api/java/security/cert/CertificateFactory.html#generateCertificate(java.io.InputStream)
# not certain if we can include a private key in the pem file; see
# http://stackoverflow.com/questions/7216969/getting-rsa-private-key-from-pem-base64-encoded-private-key-file
# helpful advice for being able to manage ca_certs outside of Java's keystore
# specifically the example ReloadableX509TrustManager
# http://jcalcote.wordpress.com/2010/06/22/managing-a-dynamic-java-trust-store/
# in the case of http://docs.python.org/2/library/ssl.html#ssl.CERT_REQUIRED
# http://docs.python.org/2/library/ssl.html#ssl.CERT_NONE
# https://github.com/rackerlabs/romper/blob/master/romper/trust.py#L15
#
# it looks like CERT_OPTIONAL simply validates certificates if
# provided, probably something in checkServerTrusted - maybe a None
# arg? need to verify as usual with a real system... :)
# http://alesaudate.wordpress.com/2010/08/09/how-to-dynamically-select-a-certificate-alias-when-invoking-web-services/
# is somewhat relevant for managing the keyfile, certfile
def _get_ca_certs_trust_manager(ca_certs):
trust_store = KeyStore.getInstance(KeyStore.getDefaultType())
trust_store.load(None, None)
with open(ca_certs) as f:
cf = CertificateFactory.getInstance("X.509")
for cert in cf.generateCertificates(BufferedInputStream(f)):
trust_store.setCertificateEntry(str(uuid.uuid4()), cert)
tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
tmf.init(trust_store)
return tmf
def _get_openssl_key_manager(cert_file, key_file=None):
paths = [key_file] if key_file else []
paths.append(cert_file)
private_key = None
certs = []
for path in paths:
with closing(FileReader(path)) as reader:
br = BufferedReader(reader)
while True:
obj = PEMReader(br).readObject()
if obj is None:
break
if isinstance(obj, KeyPair):
private_key = obj.getPrivate()
elif isinstance(obj, X509Certificate):
certs.append(obj)
key_store = KeyStore.getInstance(KeyStore.getDefaultType())
key_store.load(None, None)
key_store.setKeyEntry(str(uuid.uuid4()), private_key, [], certs)
kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
kmf.init(key_store, [])
return kmf
def _get_ssl_context(keyfile, certfile, ca_certs):
if certfile is None and ca_certs is None:
return SSLContext.getDefault()
else:
if ca_certs:
# should support composite usage below
trust_managers = _get_ca_certs_trust_manager(ca_certs).getTrustManagers()
else:
trust_managers = None
if certfile:
key_managers = _get_openssl_key_manager(certfile, keyfile).getKeyManagers()
else:
key_managers = None
# cache this lookup in the future to avoid re-reading files on every
# lookup
context = SSLContext.getInstance("SSL")
context.init(key_managers, trust_managers, None)
return context
# CompositeX509KeyManager and CompositeX509TrustManager allow for mixing together Java built-in managers
# with new managers to support Python ssl.
#
# See http://tersesystems.com/2014/01/13/fixing-the-most-dangerous-code-in-the-world/
# for a good description of this composite approach.
#
# Ported to Python from http://codyaray.com/2013/04/java-ssl-with-multiple-keystores
# which was inspired by http://stackoverflow.com/questions/1793979/registering-multiple-keystores-in-jvm
class CompositeX509KeyManager(X509KeyManager):
def __init__(self, key_managers):
self.key_managers = key_managers
def chooseClientAlias(self, key_type, issuers, socket):
for key_manager in self.key_managers:
alias = key_manager.chooseClientAlias(key_type, issuers, socket)
if alias:
return alias;
return None
def chooseServerAlias(self, key_type, issuers, socket):
for key_manager in self.key_managers:
alias = key_manager.chooseServerAlias(key_type, issuers, socket)
if alias:
return alias;
return None
def getPrivateKey(self, alias):
for key_manager in self.key_managers:
private_key = keyManager.getPrivateKey(alias)
if private_key:
return private_key
return None
def getCertificateChain(self, alias):
for key_manager in self.key_managers:
chain = key_manager.getCertificateChain(alias)
if chain:
return chain
return None
def getClientAliases(self, key_type, issuers):
aliases = []
for key_manager in self.key_managers:
aliases.extend(key_manager.getClientAliases(key_type, issuers))
if not aliases:
return None
else:
return aliases
def getServerAliases(self, key_type, issuers):
aliases = []
for key_manager in self.key_managers:
aliases.extend(key_manager.getServerAliases(key_type, issuers))
if not aliases:
return None
else:
return aliases
class CompositeX509TrustManager(X509TrustManager):
def __init__(self, trust_managers):
self.trust_managers = trust_managers
def checkClientTrusted(self, chain, auth_type):
for trust_manager in self.trust_managers:
try:
trustManager.checkClientTrusted(chain, auth_type);
return
except CertificateException:
pass
raise CertificateException("None of the TrustManagers trust this certificate chain")
def checkServerTrusted(self, chain, auth_type):
for trust_manager in self.trust_managers:
try:
trustManager.checkServerTrusted(chain, auth_type);
return
except CertificateException:
pass
raise CertificateException("None of the TrustManagers trust this certificate chain")
def getAcceptedIssuers(self):
certs = []
for trust_manager in self.trust_managers:
certs.extend(trustManager.getAcceptedIssuers())
return certs