I'm using httplib2 to make a request from my server to another web service. We want to use mutual certificate authentication. I see how to use a certificate for the outgoing connection (h.set_certificate
), but how do I check the certificate used by the answering server?
This ticket seems to indicate that httplib2 doesn't do it itself, and has only vague suggestions about where to look.
Is it possible? Am I going to have to hack at a lower level?
Here's the code my co-worker Dave St. Germain wrote to solve the problem:
import ssl
import socket
from httplib2 import has_timeout
import httplib2
import socks
class CertificateValidationError(httplib2.HttpLib2Error):
pass
def validating_sever_factory(ca_cert_file):
# we need to define a closure here because we don't control
# the arguments this class is instantiated with
class ValidatingHTTPSConnection(httplib2.HTTPSConnectionWithTimeout):
def connect(self):
# begin copypasta from HTTPSConnectionWithTimeout
"Connect to a host on a given (SSL) port."
if self.proxy_info and self.proxy_info.isgood():
sock = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM)
sock.setproxy(*self.proxy_info.astuple())
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if has_timeout(self.timeout):
sock.settimeout(self.timeout)
sock.connect((self.host, self.port))
# end copypasta
try:
self.sock = ssl.wrap_socket(sock,
self.key_file,
self.cert_file,
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=ca_cert_file
)
except ssl.SSLError:
# we have to capture the exception here and raise later because
# httplib2 tries to ignore exceptions on connect
import sys
self._exc_info = sys.exc_info()
raise
else:
self._exc_info = None
# this might be redundant
server_cert = self.sock.getpeercert()
if not server_cert:
raise CertificateValidationError(repr(server_cert))
def getresponse(self):
if not self._exc_info:
return httplib2.HTTPSConnectionWithTimeout.getresponse(self)
else:
raise self._exc_info[1], None, self._exc_info[2]
return ValidatingHTTPSConnection
def do_request(url,
method='GET',
body=None,
headers=None,
keyfile=None,
certfile=None,
ca_certs=None,
proxy_info=None,
timeout=30):
"""
makes an http/https request, with optional client certificate and server
certificate verification.
returns response, content
"""
kwargs = {}
h = httplib2.Http(proxy_info=proxy_info, timeout=timeout)
is_ssl = url.startswith('https')
if is_ssl and ca_certs:
kwargs['connection_type'] = validating_sever_factory(ca_certs)
if is_ssl and keyfile and certfile:
h.add_certificate(keyfile, certfile, '')
return h.request(url, method=method, body=body, headers=headers, **kwargs)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With