Commit 032e474b authored by Tharyrok's avatar Tharyrok

For ispng to recognize the certificate as a renewal and not as a new client,...

For ispng to recognize the certificate as a renewal and not as a new client, the CN field of the certificate must match.
The correct format is: "certificate for {email}".
parent 94b0bd2e
......@@ -12,8 +12,39 @@ import zipfile
from OpenSSL import crypto
import requests
from requests.adapters import HTTPAdapter
from requests.exceptions import HTTPError
from requests.packages.urllib3.util.retry import Retry
NEUTRINET_API_URL="https://api.neutrinet.be/api"
OPENSSL_DATE_FORMAT="%Y%m%d%H%M%SZ"
DAYS_BEFORE_RENEWAL=30
CLIENT_KEY_BITS=4096
CLIENT_CSR_SIGN_DIGEST="sha256"
def create_csr(email, commonName):
private_key = crypto.PKey()
private_key.generate_key(crypto.TYPE_RSA, CLIENT_KEY_BITS)
csr = crypto.X509Req()
csr_subject = csr.get_subject()
csr_subject.C = "BE"
csr_subject.CN = commonName
csr_subject.emailAddress = email
csr.set_pubkey(private_key)
csr.sign(private_key, CLIENT_CSR_SIGN_DIGEST)
private_key_pem = crypto.dump_privatekey(crypto.FILETYPE_PEM, private_key)
csr_pem = crypto.dump_certificate_request(crypto.FILETYPE_PEM, csr)
return csr_pem, private_key_pem
def filter_ipv4_clients(clients):
for client in clients:
for lease in client['leases']:
if lease['ipVersion'] == 4:
yield client
def retry_session(
retries=3,
backoff_factor=0.3,
......@@ -39,140 +70,194 @@ def retry_session(
return session
def renew(login, password, client_cert_filename=None, working_dir=None, log_level=logging.INFO):
logging.basicConfig(stream=sys.stdout, level=log_level, format="%(levelname)s:%(message)s")
def renew(login, password,
client_cert_filename=None,
output_dir=None,
common_name=None,
email=None,
force=False):
if client_cert_filename and os.path.isfile(client_cert_filename):
logging.debug("Checking expiration date for {}".format(client_cert_filename))
logging.info("Checking expiration date for certificate '{}'"
.format(client_cert_filename))
with open(client_cert_filename, 'r') as ifd:
client_cert = ifd.read()
client_cert_pem = crypto.load_certificate(crypto.FILETYPE_PEM, ifd.read())
if not check_expiration_date(client_cert):
logging.info("The certificate doesn't need to be renewed. Leaving...")
client_cert_expiration_timestamp = client_cert_pem.get_notAfter().decode()
client_cert_expiration_date = datetime.strptime(
client_cert_expiration_timestamp, OPENSSL_DATE_FORMAT)
client_cert_remaining_days = (client_cert_expiration_date - datetime.now()).days
if not force and client_cert_remaining_days > DAYS_BEFORE_RENEWAL:
logging.info("The certificate doesn't expires in {} days. Nothing to do."
.format(client_cert_remaining_days))
return
logging.info("The certificate expires in {}. Renewing..."
.format(client_cert_remaining_days))
if not working_dir:
working_dir = "certs_{:%Y-%m-%d_%H:%M:%S}".format(datetime.today())
os.makedirs(working_dir)
if not output_dir:
output_dir = "certs_{:%Y-%m-%d_%H:%M:%S}".format(datetime.today())
os.makedirs(output_dir, exist_ok=True)
with retry_session() as session:
logging.debug("Sending client's credentials")
response = session.post("https://api.neutrinet.be/api/user/login",
logging.info("Sending client's credentials to Neutrinet VPN...")
response_login = session.post(NEUTRINET_API_URL + "/user/login",
json={"user": login, "password": password})
response.raise_for_status()
response_login.raise_for_status()
session_data = response.json()
session_data = response_login.json()
session_header = {"Session": session_data["token"]}
logging.debug("Retrieving client data")
response = session.get('https://api.neutrinet.be/api/client/all?compose=true',
params={"user": session_data["user"]},
logging.info("Retrieving client data...")
response_clients = session.get(NEUTRINET_API_URL + "/client/all",
params={"user": session_data["user"], "compose": "true"},
headers=session_header)
response.raise_for_status()
client = response.json()[0]
response_clients.raise_for_status()
vpn_clients = response_clients.json()
logging.debug("Generating new certificate using OpenSSL")
csr, client_key = create_csr(login)
try:
vpn_client = next(filter_ipv4_clients(vpn_clients))
except StopIteration:
logging.warning("No IPv4 client found for user {}."
.format(login))
try:
vpn_client = vpn_clients[0]
except IndexError:
logging.error("No client found for user {}. Aborting..."
.format(login))
sys.exit(1)
logging.info("Generating new certificate using OpenSSL...")
with open(os.path.join(working_dir, "CSR.csr"), 'wb') as ofd:
ofd.write(csr)
if common_name is None:
common_name = vpn_client.get('commonName', login)
with open(os.path.join(working_dir, "client.key"), 'wb') as ofd:
ofd.write(client_key)
if email is None:
email = login
logging.debug("Checking if a certificate is already present")
response = session.get("https://api.neutrinet.be/api/client/{id}/cert/all".format(id=client["id"]),
csr_pem, client_key_pem = create_csr(email, common_name)
with open(os.path.join(output_dir, "client.key"), 'wb') as ofd:
ofd.write(client_key_pem)
with open(os.path.join(output_dir, "CSR.csr"), 'wb') as ofd:
ofd.write(csr_pem)
logging.info("Checking if a certificate is already present...")
response_certs = session.get(NEUTRINET_API_URL + "/client/{}/cert/all"
.format(vpn_client['id']),
headers=session_header,
params={ "active" : "true" })
response.raise_for_status()
cert = response.json()[0] if response.json() else None
if not cert:
logging.info("We don't have any certificate, let's add a new one")
logging.debug("Uploading new certificate")
response = session.put("https://api.neutrinet.be/api/client/{id}/cert/new".format(id=client["id"]),
response_certs.raise_for_status()
try:
vpn_cert = response_certs.json()[0]
except Exception:
vpn_cert = None
if not vpn_cert:
logging.info("We don't have any certificate, let's add a new one!")
logging.info("Uploading new certificate...")
response_cert = session.put(NEUTRINET_API_URL + "/client/{}/cert/new"
.format(vpn_client['id']),
headers=session_header,
data=csr,
data=csr_pem,
params={ "rekey": "false", "validityTerm": 1 })
response.raise_for_status()
cert = response.json()
response_cert.raise_for_status()
vpn_cert = response_cert.json()
else:
logging.info("We already have a certificate, let's update it")
logging.debug("Uploading new certificate")
response = session.put("https://api.neutrinet.be/api/client/{client[id]}/cert/{cert[id]}".format(client=client, cert=cert),
logging.info("We already have a certificate, let's update it!")
logging.info("Uploading new certificate...")
response_cert = session.put(NEUTRINET_API_URL + "/client/{}/cert/{}"
.format(vpn_client['id'], vpn_cert['id']),
headers=session_header,
data=csr,
data=csr_pem,
params={ "rekey": "true", "validityTerm": 1 })
response.raise_for_status()
response_cert.raise_for_status()
logging.debug("Downloading new config")
response = session.post("https://api.neutrinet.be/api/client/{id}/config".format(id=client["id"]),
headers=session_header, json={"platform":"linux"})
response.raise_for_status()
logging.debug("Unzipping config")
zipfile.ZipFile(io.BytesIO(response.content)).extractall(working_dir)
for vpn_duplicate_client in vpn_clients:
if vpn_duplicate_client['id'] == vpn_client['id']:
continue
return working_dir
if vpn_duplicate_client['commonName'] != common_name:
continue
def check_expiration_date(cert):
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
timestamp = x509.get_notAfter().decode()
expiration_date = datetime.strptime(timestamp, "%Y%m%d%H%M%SZ")
delta = (expiration_date - datetime.now())
return delta.days < 31
logging.info("Removing duplicate client {}..."
.format(vpn_duplicate_client['id']))
response_deleted_client = session.delete(NEUTRINET_API_URL + "/client/{}"
.format(vpn_duplicate_client['id']),
headers=session_header)
try:
response_deleted_client.raise_for_status()
except HTTPError as e:
logging.warning("Unable to delete client {}: {}"
.format(vpn_duplicate_client['id'], e))
def create_csr(email):
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 4096)
req = crypto.X509Req()
req_subject = req.get_subject()
req_subject.C = "BE"
req_subject.CN = email
req_subject.emailAddress = email
req.set_pubkey(key)
req.sign(key, "sha256")
private_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, key)
csr = crypto.dump_certificate_request(crypto.FILETYPE_PEM, req)
return csr, private_key
logging.info("Downloading new config...")
response_config = session.post(NEUTRINET_API_URL + "/client/{}/config"
.format(vpn_client['id']),
headers=session_header,
json={"platform":"linux"})
response_config.raise_for_status()
logging.info("Unzipping config...")
zipfile.ZipFile(io.BytesIO(response_config.content)).extractall(output_dir)
return output_dir
def main():
import argparse
from getpass import getpass
parser = argparse.ArgumentParser(description="Renew certificates for the Neutrinet VPN.")
parser = argparse.ArgumentParser(
description="Renew certificates for the Neutrinet VPN.")
parser.add_argument("login",
help="User login for the Neutrinet VPN.")
parser.add_argument("-p", "--password",
help="User password for the Neutrinet VPN.")
parser.add_argument("-v", "--verbose", action="store_true",
help="Increase verbosity and display debug messages.")
parser.add_argument("-c", "--cert",
help="Public part of the client certificate. \
This forces the script to check if the certificate is expired before renewing it.")
parser.add_argument("-d", "--directory",
parser.add_argument("-d", "--output_dir",
help="Output directory where to store the newly generated certificates. \
By default, everything is stored in a randomly generated directory.")
parser.add_argument("-n", "--common_name",
help="Specify the certificate's Common Name (CN). \
By default, the script will first try to reuse the CN stored on the server. \
Otherwise, it will use the user login as the certificate's CN. \
Use this option only if you know what you are doing!!")
parser.add_argument("-e", "--email",
help="Specify the certificate's email. \
By default, the script will use the user login as the certificate's email.")
parser.add_argument("-f", "--force", action="store_true",
help="Force the certificate renewal.")
parser.add_argument("-v", "--verbose", action="store_true",
help="Increase verbosity and display debug messages.")
parser.add_argument("-q", "--quiet", action="store_true",
help="Quiet mode.")
args = parser.parse_args()
if not args.password:
args.password = getpass()
if args.verbose:
if args.quiet:
log_level = logging.WARNING
elif args.verbose:
log_level = logging.DEBUG
else:
log_level = logging.INFO
renew(args.login, args.password, client_cert_filename=args.cert,
working_dir=args.directory, log_level=log_level)
logging.basicConfig(stream=sys.stdout,
level=log_level,
format="%(levelname)s: %(message)s")
renew(args.login, args.password,
client_cert_filename=args.cert,
output_dir=args.output_dir,
common_name=args.common_name,
email=args.email,
force=args.force)
if __name__ == "__main__":
main()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment