#!/usr/bin/python3 import argparse import configparser import datetime import hashlib import os import pathlib import secrets import sys import urllib.parse CHRSET_NUM = "0123456789" CHRSET_alpha = "abcdefghijklmnopqrstuvwxyz" CHRSET_ALPHA = CHRSET_alpha.upper() CHRSET_ALNUM = CHRSET_NUM + CHRSET_alpha + CHRSET_ALPHA CHRSET_af = "abcdef" CHRSET_AF = CHRSET_af.upper() CHRSET_HEX = CHRSET_NUM + CHRSET_af + CHRSET_AF class ConfigReader(object): def __init__(self, path): try: self.__p = configparser.ConfigParser() self.__p.read(path) except configparser.Error as e: raise Exception("Failed to parse config.") def __get(self, section, option): try: return self.__p.get(section, option) except configparser.Error as e: raise Exception(f"Failed to parse config {section}/{option}.") def __getlist(self, section, optionbase): try: return [ (int(v[0].split("_")[-1]), v[1]) for v in self.__p.items(section) if v[0].startswith(optionbase + "_") ] except (configparser.Error, ValueError, IndexError) as e: raise Exception(f"Failed to parse config list {section}/{optionbase}_.") def getUsers(self): return self.__getlist("CREDENTIALS", "user") def getPw(self, userIndex): fields = self.__get("CREDENTIALS", f"pw_{userIndex}").split(":") try: if fields[2] != "pbkdf2hmac": raise ValueError return (bytes.fromhex(fields[0]), # password hash bytes.fromhex(fields[1]), # salt fields[3].strip().lower(), # kdf hash function int(fields[4]) # kdf iterations ) except (IndexError, ValueError) as e: raise Exception(f"Failed to parse config CREDENTIALS/pw_{index}.") def getDomains(self, userIndex): return self.__getlist(f"DOMAINS_user_{userIndex}", "domain") def getDomainByName(self, userIndex, domain): for confDomainIndex, confDomain in self.getDomains(userIndex=userIndex): if confDomain == domain: return confDomainIndex return None def getInfofile(self, userIndex, domainIndex): return self.__get(f"DOMAINS_user_{userIndex}", f"infofile_domain_{domainIndex}") def out(s=""): print(s, "\r\n", file=sys.stdout, sep="", end="") def error(msg): out("Status: 500 Internal Server Error") out() # End of headers. out("Error.") # out(msg) raise Exception(msg) def update(conf, userIndex, domainIndex, domain, ip4, ip6, ip6pfx): infofile = conf.getInfofile(userIndex=userIndex, domainIndex=domainIndex) with open(infofile, "w") as f: f.write(f"domain={domain}\n" f"ip4={ip4}\n" f"ip6={ip6}\n" f"ip6pfx={ip6pfx}\n" f"updated={datetime.datetime.now().isoformat()}\n") out("Status: 200 Ok") out() # End of headers. out("Ok.") return 0 def checkCredentials(conf, user, pw): ok = False foundUserIndex = None for confUserIndex, confUserName in conf.getUsers(): if secrets.compare_digest(confUserName, user): foundUserIndex = confUserIndex if foundUserIndex is not None: phashExp, salt, kdfHash, kdfIter = conf.getPw(userIndex=foundUserIndex) phashAct = hashlib.pbkdf2_hmac(hash_name=kdfHash, password=pw.encode("UTF-8"), salt=salt, iterations=kdfIter) ok = secrets.compare_digest(phashAct, phashExp) return ok, foundUserIndex def runAdminCmds(conf, args): if args.hash_pw is not None: salt = secrets.token_bytes(args.salt_bytes) phash = hashlib.pbkdf2_hmac(hash_name=args.kdf_hash.lower(), password=args.hash_pw.encode("UTF-8"), salt=salt, iterations=args.kdf_iter) print(f"{phash.hex()}:" f"{salt.hex()}:" f"pbkdf2hmac:" f"{args.kdf_hash.lower()}:" f"{args.kdf_iter}") return 0 return 1 def main(): p = argparse.ArgumentParser() p.add_argument("-c", "--config", type=pathlib.Path, default=pathlib.Path("/etc/dyndns-update.conf"), help="Path to configuration file.") p.add_argument("-p", "--hash-pw", type=str, help="Hash a password string, then exit.") p.add_argument("-H", "--kdf-hash", type=str, default="sha256", help="Hash function for --hash-pw.") p.add_argument("-I", "--kdf-iter", type=int, default=1000, help="Number of KDF iterations for --hash-pw.") p.add_argument("-S", "--salt-bytes", type=int, default=16, help="Number of salt bytes for --hash-pw.") args = p.parse_args() conf = ConfigReader(args.config) if args.hash_pw is not None: return runAdminCmds(conf=conf, args=args) out("Content-type: text/plain") qs = os.environ.get("QUERY_STRING", "") if not qs: return error("No query string.") forms = urllib.parse.parse_qs(qs) def getform(key, default=""): values = forms.get(key, None) if values: return values[-1] return default ok, userIndex = checkCredentials(conf=conf, user=getform("user"), pw=getform("pw")) if not ok: return error("Invalid credentials.") domain = getform("domain") ip4 = getform("ip4") ip6 = getform("ip6") ip6pfx = getform("ip6pfx") if (not domain or len(domain) > 128 or not all(c in (CHRSET_ALNUM + ".-") for c in domain)): return error("Invalid domain.") if (len(ip4) > (4*3)+3 or not all(c in (CHRSET_NUM + ".") for c in ip4)): return error("Invalid ip4") if (len(ip6) > (8*4)+7 or not all(c in (CHRSET_HEX + ":") for c in ip6)): return error("Invalid ip6") if (len(ip6pfx) > (8*4)+7+1+3 or not all(c in (CHRSET_HEX + ":/") for c in ip6pfx)): return error("Invalid ip6pfx") domainIndex = conf.getDomainByName(userIndex=userIndex, domain=domain) if domainIndex is None: return error("Unknown domain.") return update(conf=conf, userIndex=userIndex, domainIndex=domainIndex, domain=domain, ip4=ip4, ip6=ip6, ip6pfx=ip6pfx) sys.exit(main())