Commit 65a3fb46 authored by HgO's avatar HgO
Browse files

migrate to python3, use built-in python logger, use argument parser

parent 01b9306e
import os
import json
import zipfile
import requests
import time
#!/usr/bin/python3
from StringIO import StringIO
from datetime import datetime
from contextlib import contextmanager
from debug_context_manager import debug
@contextmanager
def retry():
for i in range(3):
try:
yield
return
except Exception as e:
print("Warning: exception '%s' occured, retrying" % e)
time.sleep(10)
print "After 3 retry, still failing :("
raise e
from getpass import getpass
import io
import json
import logging
import os
import sys
import time
import zipfile
def renew(login, password):
working_dir = "certs_%s" % datetime.today().strftime("%F_%X")
from OpenSSL import crypto
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
def retry_session(
retries=3,
backoff_factor=0.3,
status_forcelist=(500, 502, 504),
session=None,
):
"""
See: https://www.peterbe.com/plog/best-practice-with-retries-with-requests
"""
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def renew(login, password, client_cert_filename = None, log_level=logging.INFO):
logging.basicConfig(stream=sys.stdout, level=log_level, format="%(levelname)s:%(message)s")
working_dir = "certs_{:%F_%X}".format(datetime.today())
os.makedirs(working_dir)
s = requests.Session()
with debug("Login"):
with retry():
response = s.post("https://api.neutrinet.be/api/user/login", data=json.dumps({"user": login, "password": password}))
assert response.status_code == 200, response.content
session_data = response.json()
with debug("Get client data"):
with retry():
response = s.get('https://api.neutrinet.be/api/client/all?compose=true&user=%s' % session_data["user"], headers={"Session": session_data["token"]})
assert response.status_code == 200, response.content
client = response.json()[0]
openssl_config = """
[ req ]
prompt = no
distinguished_name = req_distinguished_name
[ req_distinguished_name ]
0.organizationName = .
organizationalUnitName = .
emailAddress = %(login)s
localityName = .
stateOrProvinceName = .
countryName = BE
commonName = certificate for %(login)s
""" % {"login": login}
open(os.path.join(working_dir, "config"), "w").write(openssl_config)
with debug("Generate new cert using openssl"):
assert os.system("cd '%s' && openssl req -out CSR.csr -new -newkey rsa:4096 -nodes -keyout client.key -config config" % working_dir) == 0
with debug("See if I already have a cert"):
with retry():
response = s.get("https://api.neutrinet.be/api/client/%s/cert/all?active=true" % client["id"], headers={"Session": session_data["token"]})
assert response.status_code == 200, response.content
cert = response.json()[0] if response.json() else None
if not cert:
print("I don't have any cert, let's add a new one")
with debug("Put new cert online"):
with retry():
response = s.put("https://api.neutrinet.be/api/client/%s/cert/new?rekey=false&validityTerm=1" % client["id"], headers={"Session": session_data["token"]}, data=open(os.path.join(working_dir, "CSR.csr"), "r").read())
assert response.status_code == 200, response.content
with retry_session() as session:
logging.debug("Sending client's credentials")
response = session.post("https://api.neutrinet.be/api/user/login",
json={"user": login, "password": password})
response.raise_for_status()
session_data = response.json()
session_header = {"Session": session_data["token"]}
logging.debug("Retrieving client data")
response = session.get('https://api.neutrinet.be/api/client/all?compose=true',
params={"user": session_data["user"]},
headers=session_header)
response.raise_for_status()
client = response.json()[0]
if os.path.isfile(client_cert_filename):
logging.debug("Checking expiration date for {}".format(client_cert_filename))
with open(client_cert_filename, 'r') as ifd:
client_cert = ifd.read()
if not check_expiration_date(client_cert):
logging.info("The certificate doesn't need to be renewed. Leaving...")
return
logging.debug("Generating new certificate using OpenSSL")
csr, client_key = create_csr(login)
with open(os.path.join(working_dir, "CSR.csr"), 'wb') as ofd:
ofd.write(csr)
with open(os.path.join(working_dir, "client.key"), 'wb') as ofd:
ofd.write(client_key)
logging.debug("Checking if a certificate is already present")
response = session.get("https://api.neutrinet.be/api/client/{id}/cert/all".format(id=client["id"]),
headers=session_header,
params={ "active" : True })
response.raise_for_status()
cert = response.json()[0] if response.json() else None
if not cert:
logging.info("We don't have any certificate, let's add a new one")
logging.debug("Uploading new certificate")
response = session.put("https://api.neutrinet.be/api/client/{id}/cert/new".format(id=client["id"]),
headers=session_header,
data=csr,
params={ "rekey": False, "validityTerm": True })
response.raise_for_status()
cert = response.json()
else:
print("I already have a cert, let's update it")
with debug("Put new cert online"):
with retry():
response = s.put("https://api.neutrinet.be/api/client/%s/cert/%s?rekey=true&validityTerm=1" % (client["id"], cert["id"]), headers={"Session": session_data["token"]}, data=open(os.path.join(working_dir, "CSR.csr"), "r").read())
else:
logging.info("We already have a certificate, let's update it")
logging.debug("Uploading new certificate")
response = session.put("https://api.neutrinet.be/api/client/{client[id]}/cert/{cert[id]}".format(client=client, cert=cert),
headers=session_header,
data=csr,
params={ "rekey": True, "validityTerm": True })
response.raise_for_status()
logging.debug("Downloading new config")
response = session.post("https://api.neutrinet.be/api/client/{id}/config".format(id=client["id"]),
headers=session_header, json={"platform":"linux"})
response.raise_for_status()
logging.debug("Unzipping config")
zipfile.ZipFile(io.BytesIO(response.content)).extractall(working_dir)
with debug("Download new config"):
with retry():
response = s.post("https://api.neutrinet.be/api/client/%s/config" % client["id"], headers={"Session": session_data["token"]}, data=json.dumps({"platform":"linux"}))
assert response.status_code == 200, response.content
return working_dir
with debug("Extract config from zipfile"):
zipfile.ZipFile(StringIO(response.content)).extractall(working_dir)
def check_expiration_date(cert):
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
timestamp = x509.get_notAfter().decode()
expiration_date = datetime.strptime(timestamp, "%Y%m%d%H%M%SZ")
delta = (expiration_date - datetime.now())
return delta.days < 31
def create_csr(email):
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 4096)
scr = crypto.X509Req()
req_subject = req.get_subject()
req_subject.C = "BE"
req_subject.CN = email
req_subject.emailAddress = email
req.set_pubkey(key)
req.sign(key, "sha256")
private_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, key)
csr = crypto.dump_certificate_request(crypto.FILETYPE_PEM, req)
return csr, private_key
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("login")
parser.add_argument("-p", "--password")
parser.add_argument("-d", "--debug", action="store_true")
parser.add_argument("-c", "--cert")
args = parser.parse_args()
if not args.password:
args.password = getpass()
if args.debug:
log_level = logging.DEBUG
else:
log_level = logging.INFO
renew(args.login, args.password, client_cert_filename=args.cert, log_level=log_level)
return working_dir
if __name__ == "__main__":
main()
......@@ -8,73 +8,73 @@ from renew import renew
from debug_context_manager import debug
CRT_PATH = "/etc/openvpn/keys/user.crt"
OPENVPN_CONF_DIR = "/etc/openvpn"
OPENVPN_KEYS_DIR = os.path.join(OPENVPN_CONF_DIR, "keys")
OPENVPN_CREDENTIALS_FILE = os.path.join(OPENVPN_KEYS_DIR, "credentials")
OPENVPN_AUTH_FILE = os.path.join(OPENVPN_KEYS_DIR, "auth")
OPENVPN_USER_CERT = os.path.join(OPENVPN_KEYS_DIR, "user.crt")
OPENVPN_USER_KEY = os.path.join(OPENVPN_KEYS_DIR, "user.key")
OPENVPN_SERVER_CERT = os.path.join(OPENVPN_KEYS_DIR, "ca-server.crt")
def from_cube():
if os.path.exists("/etc/openvpn/keys/credentials"):
login, password = [x.strip() for x in open("/etc/openvpn/keys/credentials", "r").read().split("\n") if x.strip()]
elif os.path.exists("/etc/openvpn/auth"):
login, password = [x.strip() for x in open("/etc/openvpn/auth", "r").read().split("\n") if x.strip()]
if os.path.isfile(OPENVPN_CREDENTIALS_FILE):
with open(OPENVPN_CREDENTIALS_FILE, "r") as ifd:
login, password = [ x.strip() for x in ifd ]
elif os.path.isfile(OPENVPN_AUTH_FILE):
with open(OPENVPN_AUTH_FILE, "r") as ifd:
login, password = [ x.strip() for x in ifd ]
else:
print("Error: I can't find your credentials for neutrinet since neither /etc/openvpn/keys/credentials nor /etc/openvpn/auth exists on your filesystem")
print("Error: Cannot find your credentials for neutrinet since neither {credentials} nor {auth} exists on your filesystem".format(credentials=OPENVPN_CREDENTIALS_FILE, auth=OPENVPN_AUTH_FILE)
sys.exit(1)
in_cron = (sys.argv[1:] and sys.argv[1:][0] == "--cron")
if in_cron and os.path.exists(CRT_PATH):
expiration_date = subprocess.check_output('openssl x509 -in %s -noout -enddate | sed -e "s/.*=//"' % CRT_PATH, shell=True).strip()
expiration_date = datetime.strptime(expiration_date, "%b %d %H:%M:%S %Y GMT")
delta = (expiration_date - datetime.now())
renew_dir = renew(login, password, OPENVPN_USER_CERT)
# only renew if cert expire in less than 4 months
if delta.days > (31 * 4):
sys.exit(0)
_, run_id = renew_dir.split("_", 1)
result_dir = renew(login, password)
with debug("Saving old OpenVPN config"):
shutil.copytree(OPENVPN_CONF_DIR, OPENVPN_CONF_DIR + ".old_{}".format(run_id))
run_id = result_dir.split("_", 1)[1]
with debug("Copying new config"):
shutil.copy("neutrinet_openvpn_config", os.path.join(OPENVPN_CONF_DIR, "client.conf.tpl")
with debug("Saving old openvpn config"):
shutil.copytree("/etc/openvpn/", "/etc/openvpn.old_%s" % run_id)
with debug("copying new config"):
shutil.copy("neutrinet_openvpn_config", "/etc/openvpn/client.conf.tpl")
if not os.path.exists("/etc/openvpn/keys"):
os.makedirs("/etc/openvpn/keys")
if not os.path.exists(OPENVPN_KEYS_DIR):
os.makedirs(OPENVPN_KEYS_DIR)
with debug("Copying new cert"):
shutil.copy(os.path.join(result_dir, "ca.crt"), "/etc/openvpn/keys/ca-server.crt")
shutil.copy(os.path.join(result_dir, "client.crt"), "/etc/openvpn/keys/user.crt")
shutil.copy(os.path.join(result_dir, "client.key"), "/etc/openvpn/keys/user.key")
shutil.copy(os.path.join(renew_dir, "ca.crt"), OPENVPN_SERVER_CERT)
shutil.copy(os.path.join(renew_dir, "client.crt"), OPENVPN_USER_CERT)
shutil.copy(os.path.join(renew_dir, "client.key"), OPENVPN_USER_KEY)
with debug("Adding user credentials"):
open("/etc/openvpn/keys/credentials", "w").write("%s\n%s\n" % (login, password))
with open(OPENVPN_CREDENTIALS_FILE, "w") as ofd:
ofd.write("{}\n{}\n".format(login, password))
commands = [
'yunohost app setting vpnclient server_name -v "vpn.neutrinet.be"',
'yunohost app setting vpnclient server_port -v "1195"',
'yunohost app setting vpnclient server_proto -v "udp"',
'yunohost app setting vpnclient service_enabled -v "1"',
'yunohost app setting vpnclient login_user -v "%s"' % login,
'yunohost app setting vpnclient login_passphrase -v "%s"' % password,
'yunohost app setting vpnclient login_user -v "{}"'.format(login),
'yunohost app setting vpnclient login_passphrase -v "{}"'.format(password),
]
for command in commands:
with debug("Running command '%s'" % command.replace(password, "xxxxxxxxxxxxxxxxxxxxx")):
with debug("Running command '{}'".format(command.replace(password, "xxxxxxxxxxxxxxxxxxxxx"))):
assert os.system(command) == 0, "ERROR: command failed"
sys.stdout.flush()
restart_command = "/usr/local/bin/ynh-vpnclient restart"
print("Critical part: reloading vpnclient using '%s'" % restart_command)
print("Critical part 1: reloading vpnclient using '{}'".format(restart_command))
try:
subprocess.check_output(restart_command.split())
except Exception:
sys.exit(1)
restart_command = "service openvpn restart"
print("Critical part 2: restart openvpn '%s'" % restart_command)
print("Critical part 2: restart openvpn '{}'".format(restart_command))
try:
subprocess.check_output(restart_command.split())
time.sleep(15)
......@@ -91,8 +91,8 @@ def from_cube():
try:
subprocess.check_output("/usr/local/bin/ynh-vpnclient restart".split())
except Exception as e:
print "ERROR: failed to restart hotspot: %s" % e
print "since this is non totally critical, let's continue"
print("ERROR: failed to restart hotspot: {}".format(e))
print("since this is non totally critical, let's continue")
if __name__ == '__main__':
......
import sys
from renew import renew
if __name__ == '__main__':
login, password = sys.argv[1:]
renew(login, password)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment