From a7704d82a165fe03eabc0f79c0904c5e6e06587a Mon Sep 17 00:00:00 2001 From: maybetree Date: Thu, 14 Aug 2025 20:28:11 +0200 Subject: [PATCH] initial commit --- .gitignore | 102 +++++++++++++++++++++++++++++++++++++++++++ klankdns.py | 121 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 223 insertions(+) create mode 100644 .gitignore create mode 100755 klankdns.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cc5c62b --- /dev/null +++ b/.gitignore @@ -0,0 +1,102 @@ +### Generic ### + +# VIM +*.sw* +*.vim* +*.obsession* + +# misc. garbage +*.bak +*.old* +~* +*~ +- + +# Damn you steve jobs!!!! +.DS_Store + +# vscope +.vscode/* +*.code-workspace +.history/ + +# potential credentials +*.pem +*.secret +**/credentials.json +**/client_secrets.json +**/client_secret.json +*creds* +*.dat +*password* +*.httr-oauth* +*.env +!*.sample.env + +# C/C++ +*.o +*.out +*.a +*.so + +# Python +__pycache__/ +*.py[cod] +*$py.class +.clirope +*.egg-info +build +dist +coverage.json +htmlcov +*.spec +.coverage + +# Archives +*.zip +*.tar +*.rar +*.tar.* +*.tzst +*.tgz +*.txz +*.zst +*.gz + +# Multimedia +*.xcf +*.svg +*.jpg +*.jpeg +*.png +*.avif +*.gif +*.pdf +*.gv +!img/*.jpg +!img/*.png +!img/*.svg +!img/*.gv + +# CMake + +CMakeCache.txt +CMakeFiles +cmake_install.cmake +*.cmake +.cmake +.skbuild-info.json +CMakeInit.txt + +# Rust + +target + +### Project-specific ### + +*.secret.toml +secret.toml + + + + diff --git a/klankdns.py b/klankdns.py new file mode 100755 index 0000000..593dc5c --- /dev/null +++ b/klankdns.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 + +import re +import json +import urllib +import tomllib +import urllib.parse +import urllib.request +from pathlib import Path + +#contents = urllib.request.urlopen("http://example.com/foo/bar").read() + +# 5 curl -X POST -L https://api.porkbun.com/api/json/v3/dns/retrieveByNameType/klank.school/A/www --data '{"secretapikey":""}' + +class API(): + def __init__(self): + + with (Path(__file__).parent / 'klankdns.secret.toml').open('rb') as f: + toml = tomllib.load(f) + + self.key = toml['klankdns']['key'] + self.secret = toml['klankdns']['secret'] + self.base = toml['klankdns']['base'] + self.entries = toml['klankdns']['entries'] + + self.public_ip = self.get_public_ip() + + def get_public_ip(self): + req = urllib.request.Request("https://ipinfo.io/ip") + with urllib.request.urlopen(req) as response: + result = response.read().decode('utf-8') + + if not re.match(r'[\d]+\.[\d]+\.[\d]+\.[\d]+', result): + raise Exception("ipinfo returned bullshit!!") + + print(f"Got public IP: {result}") + + return result + + def post(self, url, data = None): + url = urllib.parse.urljoin(self.base, url) + fulldata = { + "secretapikey": self.secret, + "apikey": self.key, + **(data or {}) + } + print(fulldata) + + req = urllib.request.Request(url) + req.add_header('Content-Type', 'application/json') + + try: + with urllib.request.urlopen(req, json.dumps(fulldata).encode('utf_8')) as response: + result = response.read().decode('utf-8') + except urllib.error.HTTPError as err: + print(err.read().decode('utf-8')) + raise err + + return result + + def get_by_name(self, name): + return self.post(urllib.parse.urljoin("dns/retrieveByNameType/", name)) + + def check(self): + for entry in self.entries: + result = self.get_by_name(entry) + print(f"{entry} : {result}") + + def get_current_ip(self, entry): + return json.loads(self.get_by_name(entry))['records'][0]['content'] + + def update(self): + for entry in self.entries: + dat = json.loads(self.get_by_name(entry)) + if dat["status"] != "SUCCESS": + raise Exception("???") + + if len(dat["records"]) > 1: + raise Exception("???") + + if len(dat["records"]) == 0: + print(f"Creating {entry}") + result = self.post( + # TODO this is spaghettt + f := urllib.parse.urljoin("dns/create/", entry.split('/')[0]), + { + "name": entry.split('/')[2], + "type": entry.split('/')[1], + "content": self.public_ip, + "ttl": "600" + } + ) + print(f"{entry} : {result}") + if json.loads(result)["status"] != "SUCCESS": + raise Exception("???") + + if len(dat["records"]) == 1: + if dat["records"][0]["content"] == self.public_ip: + print(f"Not updating {entry}") + else: + print(f"updating {entry}") + + result = self.post( + f := urllib.parse.urljoin("dns/editByNameType/", entry), + { + "content": self.public_ip, + "ttl": 600, + } + ) + print(f"{entry} : {result}") + if json.loads(result)["status"] != "SUCCESS": + raise Exception("???") + + + +if __name__ == '__main__': + api = API() + api.update() + #api.check() + +