Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions certmitm.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
#!/usr/bin/python3

import struct, OpenSSL, re, socket, argparse, os, random, sys, datetime, ssl, shutil, select, copy, time
import socket, argparse, sys, ssl, select

import os
import _thread
import tempfile, json
import logging, threading
import tempfile
import logging

import certmitm.util
import certmitm.certtest
import certmitm.connection

description = r"""
Expand Down Expand Up @@ -167,7 +166,7 @@ def threaded_connection_handler(downstream_socket):
logger.debug(f"sending to client: {from_server}")
else:
# We should never arrive here
logger.exception(f"Select returned unknown connection")
logger.exception("Select returned unknown connection")
else:
continue
break
Expand Down Expand Up @@ -196,7 +195,7 @@ def threaded_connection_handler(downstream_socket):
# Close TLS gracefully
mitm_connection.downstream_socket.unwrap()
mitm_connection.upstream_socket.unwrap()
except:
except Exception:
pass
# Close TCP gracefully
mitm_connection.downstream_socket.close()
Expand Down
6 changes: 2 additions & 4 deletions certmitm/certtest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import OpenSSL
import ssl
import os
import certmitm.util
import copy

class certtest(object):

Expand Down Expand Up @@ -31,7 +29,7 @@ def generate_test_context(original_cert_chain_pem, hostname, working_dir, logger
for tmp_cert_pem in original_cert_chain_pem:
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, tmp_cert_pem)
tmp_cert_chain.append(cert)
name = f"self_signed"
name = "self_signed"
tmp_cert_chain[0].set_issuer(tmp_cert_chain[0].get_subject())
tmp_cert_chain[0], key = certmitm.util.sign_certificate(tmp_cert_chain[0], issuer_cert=None)
certfile, keyfile = certmitm.util.save_certificate_chain([tmp_cert_chain[0]], key, working_dir, name=hostname+"_"+name)
Expand All @@ -42,7 +40,7 @@ def generate_test_context(original_cert_chain_pem, hostname, working_dir, logger
for tmp_cert_pem in original_cert_chain_pem:
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, tmp_cert_pem)
tmp_cert_chain.append(cert)
name = f"replaced_key"
name = "replaced_key"
tmp_cert_chain[0], key = certmitm.util.replace_public_key(tmp_cert_chain[0])
certfile, keyfile = certmitm.util.save_certificate_chain(tmp_cert_chain, key, working_dir, name=hostname+"_"+name)
yield certtest(name, hostname, certfile, keyfile, original_cert_chain_pem)
Expand Down
16 changes: 8 additions & 8 deletions certmitm/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, client_socket, logger):
self.client_port = int(self.client_name.split(" ")[1].split(')')[0]) #Dirty I know :)
self.upstream_ip, self.upstream_port = certmitm.util.sock_to_dest(self.client_socket)
if self.upstream_ip == "127.0.0.1" and self.upstream_port == 9900:
self.logger.debug(f"Setting debug upstream")
self.logger.debug("Setting debug upstream")
self.upstream_port = 10000
try:
self.upstream_sni = certmitm.util.SNIFromHello(self.client_socket.recv(4096, socket.MSG_PEEK))
Expand Down Expand Up @@ -108,7 +108,7 @@ def log(self, timestamp, who, what):

def get_test(self):
# If the tests have not yet been generated
if self.test_list == None:
if self.test_list is None:
with self.lock:
if not self.test_list:
# Get upstream fullchain from the server
Expand Down Expand Up @@ -176,28 +176,28 @@ def __init__(self, downstream_socket, logger):
self.downstream_tls_buf = b""

def set_upstream(self, ip, port):
self.logger.debug(f"connecting to TCP upstream")
self.logger.debug("connecting to TCP upstream")
self.upstream_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.upstream_socket.settimeout(10)
try:
self.upstream_socket.connect((ip, port))
self.upstream_tls = False
self.logger.debug(f"connected to TCP upstream")
self.logger.debug("connected to TCP upstream")
except (ConnectionRefusedError, TimeoutError, OSError) as e:
self.logger.debug(f"Upstream connection failed with {e}")
self.upstream_socket = None

def wrap_downstream(self, context):
self.logger.debug(f"Wrapping downstream with TLS")
self.logger.debug("Wrapping downstream with TLS")
self.downstream_socket = context.wrap_socket(self.downstream_socket, server_side=True)
self.downstream_socket.settimeout(10)
self.downstream_tls = True
self.logger.debug(f"Wrapped downstream with TLS")
self.logger.debug("Wrapped downstream with TLS")

def wrap_upstream(self, hostname):
self.logger.debug(f"Wrapping upstream with TLS")
self.logger.debug("Wrapping upstream with TLS")
self.upstream_context = certmitm.util.create_client_context()
self.upstream_socket = self.upstream_context.wrap_socket(self.upstream_socket, server_hostname=hostname)
self.upstream_socket.settimeout(10)
self.upstream_tls = True
self.logger.debug(f"Wrapped upstream with TLS")
self.logger.debug("Wrapped upstream with TLS")
1 change: 1 addition & 0 deletions certmitm/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def delete_extension(cert, extension):
# Saves a certificate/key pair and returns the filenames for them
def save_certificate_chain(certs, key, working_dir, name=None):
if not name:
# XXX cert is an undefined variable
name = str(cert.get_subject().commonName)
directory = os.path.join(working_dir, "certificates")
if not os.path.isdir(directory):
Expand Down