commit cd6668de3221875f6217d7bc4cfa23e9e0768fa5 Author: Adam Rabjerg Date: Tue Jun 29 00:32:34 2021 +0200 inital diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cf0c42e --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +venv* +.idea +gnupg* +python-* +__pycache__ +*.gpg \ No newline at end of file diff --git a/demo.py b/demo.py new file mode 100644 index 0000000..33095e6 --- /dev/null +++ b/demo.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 + +from ob_save import get_fingerprint_by_email, write_gpg_file, read_gpg_file +import json + +""" First we open the normal json data and loads it as a dict, this is our dummy data. "raw_json" """ +with open("json_stuff.txt", "r") as json_file: + raw_json = json.load(json_file) + +""" Lets show the raw json, and verify that it is a dictionary""" +print("#"*30, "\n") +print("raw_json content:", raw_json) +print("raw_json type: ", type(raw_json)) +print("\n", "#"*30, "\n") + +""" Setup constants """ +signature_email = "b1-systems@example.com" +gpg_fingerprint = get_fingerprint_by_email(signature_email) + +""" Lets sign and write the content to a new file""" +print("Writing json to file.") +write_gpg_file(raw_json, "target_filename.txt", fingerprint=gpg_fingerprint) + +""" Lets read back the content, AND verify that the signature is valid """ +gpg_content = read_gpg_file("more_json_stuff.txt"+".gpg", verify=True) + +""" gpg_content is a dictonary with more than just the json, so lets split it in its nessecary parts.""" +readback_json = json.loads(gpg_content["json"]) +verified_status = gpg_content["verified"] + +print("\n", "#"*30, "\n") +print("Is original json and readback json identical?: {}".format(readback_json == raw_json)) +print("Is signature valid?: {}".format(verified_status)) diff --git a/json_stuff.txt b/json_stuff.txt new file mode 100644 index 0000000..30bb10b --- /dev/null +++ b/json_stuff.txt @@ -0,0 +1,40 @@ +{ + "type": "result", + "timestamp": "2021-06-23T22:50:21Z", + "ping": { + "jitter": 0.23300000000000001, + "latency": 13.122999999999999 + }, + "download": { + "bandwidth": 11050354, + "bytes": 86103584, + "elapsed": 7801 + }, + "upload": { + "bandwidth": 4113077, + "bytes": 14873760, + "elapsed": 3614 + }, + "packetLoss": 0, + "isp": "Deutsche Telekom AG", + "interface": { + "internalIp": "192.168.178.60", + "name": "eth0", + "macAddr": "DC:A6:32:5D:54:50", + "isVpn": false, + "externalIp": "91.19.224.10" + }, + "server": { + "id": 36896, + "name": "AlphaCron", + "location": "Apfelstädt", + "country": "Germany", + "host": "speed.alphacron.de", + "port": 8080, + "ip": "134.97.32.23" + }, + "result": { + "id": "d9e6c6d6-55c5-4359-9bd0-a48190ed27e9", + "url": "https://www.speedtest.net/result/c/d9e6c6d6-55c5-4359-9bd0-a48190ed27e9" + } +} \ No newline at end of file diff --git a/ob_save.py b/ob_save.py new file mode 100644 index 0000000..e035370 --- /dev/null +++ b/ob_save.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 + +import json +import gnupg + +""" Now rewrite all this doodoo into a proper class """ + +gnupg_homedir = "./gnupg_home/" +keyring_file = "pubring.kbx" +json_filename = 'json_stuff.txt' + +gpg = gnupg.GPG(gnupghome=gnupg_homedir, keyring=gnupg_homedir+keyring_file) + + +def get_fingerprint_by_email(email): + for key in gpg.list_keys(): + if email in key["uids"][0]: + return key["fingerprint"] + return None + + +def get_keyid_by_email(email): + for key in gpg.list_keys(): + if email in key["uids"][0]: + return key["keyid"] + return None + + +def read_gpg_file(filename: str, verify: bool = None) -> dict: + """ + Opposite function of write_gpg_file() + splits message, signature and hash from input file. + reads file line by line and uses a nasty state machine looking for magic strings to determine what goes where. + + :param filename: path to file to be read, usually ends with .pgp + :param verify: Should + :return: dict -> {"json": str, "pgp_signature": str, "hash": str, "verified": bool} + + json = json payload form file as string, can be loaded to dict with json.loads(read_gpg_file(filename)["json"]) + pgp_signature = string representation of signature, use this to verify signature. + hash = hash function used. Might be useful, might not. + verified = Whenever signature verification was successful + """ + # Magic strings to look for + gpg_msg_start = "-----BEGIN PGP SIGNED MESSAGE-----\n" + gpg_sig_start = "-----BEGIN PGP SIGNATURE-----\n" + gpg_sig_end = "-----END PGP SIGNATURE-----\n" + + # Empty strings to prepare return. + complete_msg = "" + out_string = "" + hash = "" + out_signature = "" + verified = None + + # Behold... the mighty state machine + msg_start_found = False + sig_start_found = False + with open(filename, "r") as f: + for line in f.readlines(): + complete_msg += line + if not msg_start_found: + if line == gpg_msg_start: + msg_start_found = True + continue + if line.startswith("Hash"): + hash += line.split(" ")[1].rstrip() # Remove unused whitespace + continue + if line == gpg_sig_start: + sig_start_found = True + continue + if not sig_start_found: + out_string += line.lstrip().rstrip() # Remove unused whitespace + if sig_start_found and msg_start_found and line != gpg_sig_end: + out_signature += line.lstrip().rstrip() # Remove unused whitespace + if line == gpg_sig_end: + break + if verify: + verified = gpg.verify(complete_msg).status + else: + verified = None + return {"json": out_string, "pgp_signature": out_signature, "hash": hash, "verified": verified} + + +def write_gpg_file(payload_to_sign: dict, filename: str, fingerprint=None) -> bool: + """ + :param payload_to_sign: dictonary that should be signed and written to disk. + :param filename: target filename WITHOUT .gpg extention! + :param fingerprint: OPTIONAL fingerprint (string) (Not yet implemented) + :return: True if any data was written, else prints error message and returns False + """ + if type(payload_to_sign) != dict: + print("Could not write {}, invalid payload. \n" + "Ensure to only pass object type: \"dict\"".format(filename)) + return False + + with open(filename+".gpg", "w") as f: + json.dump(payload_to_sign, f) + with open(filename+".gpg", "rb") as f: + signed_string = gpg.sign_file(f, keyid=fingerprint) + print("Signing with {}".format(fingerprint)) + with open(filename+".gpg", "w") as f: + if f.write(str(signed_string)): + return True + else: + print("Could not write {}, 0 bytes written!\n" + "This message should be impossible to reach, well done.".format(filename)) + return False + + +def verify_gpg_json(payload: str, signature: str) -> bool: + """ + Placeholder function to verify if pgp signed json is valid. + :param payload: Complete pgp message with headers and signature + :param signature: + :return: + """ + return True + diff --git a/setup.txt b/setup.txt new file mode 100644 index 0000000..2c95d2b --- /dev/null +++ b/setup.txt @@ -0,0 +1,4 @@ +wget https://files.pythonhosted.org/packages/96/6c/21f99b450d2f0821ff35343b9a7843b71e98de35192454606435c72991a8/gnupg-2.3.1.tar.gz +# activate venv +pip install -e gnupg-2.3.1 +