Unverified Commit b3c49c23 authored by Thomas Balthazar's avatar Thomas Balthazar Committed by GitHub
Browse files

Merge pull request #1 from hidrarga/v3.0

Migrate to python3
parents 01b9306e 2af66b21
# Neutrinet VPN Certificates renewer
This repository store 2 python scripts to renew your Neutrinet VPN certificate.
One as to be runned on an Internet Cube that has vpnclient installed, the other
one is to be runned in a standalone fashion.
This repository stores a python3 script to renew your Neutrinet VPN certificate.
**Remark**: this repository is used by the [neutrinet_ynh](https://github.com/Neutrinet/neutrinet_ynh) app, so don't rename/delete it unless you know what you are doing.
You can run this script from your own computer.
# Usage
You can also run this script from your internet cube.
Note that there is a [Yunohost app called Neutrinet](https://github.com/Neutrinet/neutrinet_ynh) just for that.
It will setup a daily cron task that will automatically renew your certificate when needed.
## Standalone version
**Warning**: As it is used by the Yunohost app, do NOT rename or delete the script unless you know what you are doing.
## Installation
git clone https://github.com/neutrinet/renew_cert
cd renew_cert
virtualenv ve
source ve/bin/activate
pip install -r requirements.txt
git clone https://github.com/neutrinet/renew_cert
cd renew_cert
virtualenv ve
source ve/bin/activate
pip install -r requirements.txt
## Usage
python renew_local.py <login> <password>
python3 renew.py <login>
This will generate the corresponding files in a folder names like that:
`certs_2016-06-07_13:51:08` (where the date and time and the one corresponding
to the moment at which you've runned the script).
This will prompt you to enter your password for the Neutrinet's VPN.
## On the Internet Cube version
The script will then generate the certificate files in a folder named like that:
where the date and time correspond to the moment at which you ran the script.
**Except if you know what you are doing and has specific reasons to directly run this script, uses this YunoHost application instead https://github.com/Neutrinet/neutrinet_ynh/**
You can also provide the folder with:
python3 renew.py <login> -d <path/to/your/certs>
The installation procedure on the cube is a bit different (**do everything in root or with sudo**):
**Important**: This folder will contain your private key, so be careful when storing it!
git clone https://github.com/neutrinet/renew_cert
cd renew_cert
virtualenv ve --system-site-packages
source ve/bin/activate
pip install -r requirements.txt
You can choose to directly provide the password with:
python3 renew.py <login> -p <password>
Finally, you can provide the public part of your certificate.
The script will then check the expiration date before trying to renew it:
python3 renew.py <login> -c <path/to/client.crt>
python renew_for_cube.py
## Debugging
**Be aware that this will modify your vpn configuration and certs files and
restart openvpn (and the hotspot if it's installed).** It will also save your
whole `/etc/openvpn/` in a `/etc/openvpn.old_2016-06-07_19:39:53` (where the
date and time and the one corresponding to the moment at which you've runned
the script) if you need to rollback.
**If something wents wrong, you can lose your vpn connection, run that in a
situation where you can access locally your cube in ssh** (or live the YOLO
life style).
You can display debug messages with:
python3 renew.py <login> -v
import os
import json
import zipfile
import requests
import time
from StringIO import StringIO
from datetime import datetime
from contextlib import contextmanager
from debug_context_manager import debug
import io
import json
import logging
import os
import sys
import time
import zipfile
def retry():
for i in range(3):
from OpenSSL import crypto
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
def retry_session(
status_forcelist=(500, 502, 504),
See: https://www.peterbe.com/plog/best-practice-with-retries-with-requests
session = session or requests.Session()
retry = Retry(
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def renew(login, password, client_cert_filename=None, working_dir=None, log_level=logging.INFO):
logging.basicConfig(stream=sys.stdout, level=log_level, format="%(levelname)s:%(message)s")
if client_cert_filename and os.path.isfile(client_cert_filename):
logging.debug("Checking expiration date for {}".format(client_cert_filename))
with open(client_cert_filename, 'r') as ifd:
client_cert = ifd.read()
if not check_expiration_date(client_cert):
logging.info("The certificate doesn't need to be renewed. Leaving...")
except Exception as e:
print("Warning: exception '%s' occured, retrying" % e)
print "After 3 retry, still failing :("
raise e
def renew(login, password):
working_dir = "certs_%s" % datetime.today().strftime("%F_%X")
if not working_dir:
working_dir = "certs_{:%Y-%m-%d_%H:%M:%S}".format(datetime.today())
s = requests.Session()
with debug("Login"):
with retry():
response = s.post("https://api.neutrinet.be/api/user/login", data=json.dumps({"user": login, "password": password}))
assert response.status_code == 200, response.content
session_data = response.json()
with debug("Get client data"):
with retry():
response = s.get('https://api.neutrinet.be/api/client/all?compose=true&user=%s' % session_data["user"], headers={"Session": session_data["token"]})
assert response.status_code == 200, response.content
client = response.json()[0]
openssl_config = """
[ req ]
prompt = no
distinguished_name = req_distinguished_name
[ req_distinguished_name ]
0.organizationName = .
organizationalUnitName = .
emailAddress = %(login)s
localityName = .
stateOrProvinceName = .
countryName = BE
commonName = certificate for %(login)s
""" % {"login": login}
open(os.path.join(working_dir, "config"), "w").write(openssl_config)
with debug("Generate new cert using openssl"):
assert os.system("cd '%s' && openssl req -out CSR.csr -new -newkey rsa:4096 -nodes -keyout client.key -config config" % working_dir) == 0
with debug("See if I already have a cert"):
with retry():
response = s.get("https://api.neutrinet.be/api/client/%s/cert/all?active=true" % client["id"], headers={"Session": session_data["token"]})
assert response.status_code == 200, response.content
cert = response.json()[0] if response.json() else None
if not cert:
print("I don't have any cert, let's add a new one")
with debug("Put new cert online"):
with retry():
response = s.put("https://api.neutrinet.be/api/client/%s/cert/new?rekey=false&validityTerm=1" % client["id"], headers={"Session": session_data["token"]}, data=open(os.path.join(working_dir, "CSR.csr"), "r").read())
assert response.status_code == 200, response.content
with retry_session() as session:
logging.debug("Sending client's credentials")
response = session.post("https://api.neutrinet.be/api/user/login",
json={"user": login, "password": password})
session_data = response.json()
session_header = {"Session": session_data["token"]}
logging.debug("Retrieving client data")
response = session.get('https://api.neutrinet.be/api/client/all?compose=true',
params={"user": session_data["user"]},
client = response.json()[0]
logging.debug("Generating new certificate using OpenSSL")
csr, client_key = create_csr(login)
with open(os.path.join(working_dir, "CSR.csr"), 'wb') as ofd:
with open(os.path.join(working_dir, "client.key"), 'wb') as ofd:
logging.debug("Checking if a certificate is already present")
response = session.get("https://api.neutrinet.be/api/client/{id}/cert/all".format(id=client["id"]),
params={ "active" : "true" })
cert = response.json()[0] if response.json() else None
if not cert:
logging.info("We don't have any certificate, let's add a new one")
logging.debug("Uploading new certificate")
response = session.put("https://api.neutrinet.be/api/client/{id}/cert/new".format(id=client["id"]),
params={ "rekey": "false", "validityTerm": 1 })
cert = response.json()
print("I already have a cert, let's update it")
with debug("Put new cert online"):
with retry():
response = s.put("https://api.neutrinet.be/api/client/%s/cert/%s?rekey=true&validityTerm=1" % (client["id"], cert["id"]), headers={"Session": session_data["token"]}, data=open(os.path.join(working_dir, "CSR.csr"), "r").read())
with debug("Download new config"):
with retry():
response = s.post("https://api.neutrinet.be/api/client/%s/config" % client["id"], headers={"Session": session_data["token"]}, data=json.dumps({"platform":"linux"}))
assert response.status_code == 200, response.content
with debug("Extract config from zipfile"):
logging.info("We already have a certificate, let's update it")
logging.debug("Uploading new certificate")
response = session.put("https://api.neutrinet.be/api/client/{client[id]}/cert/{cert[id]}".format(client=client, cert=cert),
params={ "rekey": "true", "validityTerm": 1 })
logging.debug("Downloading new config")
response = session.post("https://api.neutrinet.be/api/client/{id}/config".format(id=client["id"]),
headers=session_header, json={"platform":"linux"})
logging.debug("Unzipping config")
return working_dir
def check_expiration_date(cert):
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
timestamp = x509.get_notAfter().decode()
expiration_date = datetime.strptime(timestamp, "%Y%m%d%H%M%SZ")
delta = (expiration_date - datetime.now())
return delta.days < 31
def create_csr(email):
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 4096)
req = crypto.X509Req()
req_subject = req.get_subject()
req_subject.C = "BE"
req_subject.CN = email
req_subject.emailAddress = email
req.sign(key, "sha256")
private_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, key)
csr = crypto.dump_certificate_request(crypto.FILETYPE_PEM, req)
return csr, private_key
def main():
import argparse
from getpass import getpass
parser = argparse.ArgumentParser(description="Renew certificates for the Neutrinet VPN.")
help="User login for the Neutrinet VPN.")
parser.add_argument("-p", "--password",
help="User password for the Neutrinet VPN.")
parser.add_argument("-v", "--verbose", action="store_true",
help="Increase verbosity and display debug messages.")
parser.add_argument("-c", "--cert",
help="Public part of the client certificate. \
This forces the script to check if the certificate is expired before renewing it.")
parser.add_argument("-d", "--directory",
help="Output directory where to store the newly generated certificates. \
By default, everything is stored in a randomly generated directory.")
args = parser.parse_args()
if not args.password:
args.password = getpass()
if args.verbose:
log_level = logging.DEBUG
log_level = logging.INFO
renew(args.login, args.password, client_cert_filename=args.cert,
working_dir=args.directory, log_level=log_level)
if __name__ == "__main__":
import os
import sys
import time
import shutil
import subprocess
from datetime import datetime
from renew import renew
from debug_context_manager import debug
CRT_PATH = "/etc/openvpn/keys/user.crt"
def from_cube():
if os.path.exists("/etc/openvpn/keys/credentials"):
login, password = [x.strip() for x in open("/etc/openvpn/keys/credentials", "r").read().split("\n") if x.strip()]
elif os.path.exists("/etc/openvpn/auth"):
login, password = [x.strip() for x in open("/etc/openvpn/auth", "r").read().split("\n") if x.strip()]
print("Error: I can't find your credentials for neutrinet since neither /etc/openvpn/keys/credentials nor /etc/openvpn/auth exists on your filesystem")
in_cron = (sys.argv[1:] and sys.argv[1:][0] == "--cron")
if in_cron and os.path.exists(CRT_PATH):
expiration_date = subprocess.check_output('openssl x509 -in %s -noout -enddate | sed -e "s/.*=//"' % CRT_PATH, shell=True).strip()
expiration_date = datetime.strptime(expiration_date, "%b %d %H:%M:%S %Y GMT")
delta = (expiration_date - datetime.now())
# only renew if cert expire in less than 4 months
if delta.days > (31 * 4):
result_dir = renew(login, password)
run_id = result_dir.split("_", 1)[1]
with debug("Saving old openvpn config"):
shutil.copytree("/etc/openvpn/", "/etc/openvpn.old_%s" % run_id)
with debug("copying new config"):
shutil.copy("neutrinet_openvpn_config", "/etc/openvpn/client.conf.tpl")
if not os.path.exists("/etc/openvpn/keys"):
with debug("Copying new cert"):
shutil.copy(os.path.join(result_dir, "ca.crt"), "/etc/openvpn/keys/ca-server.crt")
shutil.copy(os.path.join(result_dir, "client.crt"), "/etc/openvpn/keys/user.crt")
shutil.copy(os.path.join(result_dir, "client.key"), "/etc/openvpn/keys/user.key")
with debug("Adding user credentials"):
open("/etc/openvpn/keys/credentials", "w").write("%s\n%s\n" % (login, password))
commands = [
'yunohost app setting vpnclient server_name -v "vpn.neutrinet.be"',
'yunohost app setting vpnclient server_port -v "1195"',
'yunohost app setting vpnclient server_proto -v "udp"',
'yunohost app setting vpnclient service_enabled -v "1"',
'yunohost app setting vpnclient login_user -v "%s"' % login,
'yunohost app setting vpnclient login_passphrase -v "%s"' % password,
for command in commands:
with debug("Running command '%s'" % command.replace(password, "xxxxxxxxxxxxxxxxxxxxx")):
assert os.system(command) == 0, "ERROR: command failed"
restart_command = "/usr/local/bin/ynh-vpnclient restart"
print("Critical part: reloading vpnclient using '%s'" % restart_command)
except Exception:
restart_command = "service openvpn restart"
print("Critical part 2: restart openvpn '%s'" % restart_command)
except Exception:
print("ERROR: command failed, displaying logs")
print("\n".join(open("/var/log/openvpn.log", "r").split("\n")[-200:]))
if os.path.exists("/usr/local/bin/ynh-vpnclient"):
print("Few, we're done, let's wait 2min to be sure that vpn is running, then restart hotspot")
print("Restarting hotspot")
subprocess.check_output("/usr/local/bin/ynh-vpnclient restart".split())
except Exception as e:
print "ERROR: failed to restart hotspot: %s" % e
print "since this is non totally critical, let's continue"
if __name__ == '__main__':
import sys
from renew import renew
if __name__ == '__main__':
login, password = sys.argv[1:]
renew(login, password)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment