inital
This commit is contained in:
6
.gitignore
vendored
Normal file
6
.gitignore
vendored
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
venv*
|
||||||
|
.idea
|
||||||
|
gnupg*
|
||||||
|
python-*
|
||||||
|
__pycache__
|
||||||
|
*.gpg
|
||||||
33
demo.py
Normal file
33
demo.py
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
from ob_save import get_fingerprint_by_email, write_gpg_file, read_gpg_file
|
||||||
|
import json
|
||||||
|
|
||||||
|
""" First we open the normal json data and loads it as a dict, this is our dummy data. "raw_json" """
|
||||||
|
with open("json_stuff.txt", "r") as json_file:
|
||||||
|
raw_json = json.load(json_file)
|
||||||
|
|
||||||
|
""" Lets show the raw json, and verify that it is a dictionary"""
|
||||||
|
print("#"*30, "\n")
|
||||||
|
print("raw_json content:", raw_json)
|
||||||
|
print("raw_json type: ", type(raw_json))
|
||||||
|
print("\n", "#"*30, "\n")
|
||||||
|
|
||||||
|
""" Setup constants """
|
||||||
|
signature_email = "b1-systems@example.com"
|
||||||
|
gpg_fingerprint = get_fingerprint_by_email(signature_email)
|
||||||
|
|
||||||
|
""" Lets sign and write the content to a new file"""
|
||||||
|
print("Writing json to file.")
|
||||||
|
write_gpg_file(raw_json, "target_filename.txt", fingerprint=gpg_fingerprint)
|
||||||
|
|
||||||
|
""" Lets read back the content, AND verify that the signature is valid """
|
||||||
|
gpg_content = read_gpg_file("more_json_stuff.txt"+".gpg", verify=True)
|
||||||
|
|
||||||
|
""" gpg_content is a dictonary with more than just the json, so lets split it in its nessecary parts."""
|
||||||
|
readback_json = json.loads(gpg_content["json"])
|
||||||
|
verified_status = gpg_content["verified"]
|
||||||
|
|
||||||
|
print("\n", "#"*30, "\n")
|
||||||
|
print("Is original json and readback json identical?: {}".format(readback_json == raw_json))
|
||||||
|
print("Is signature valid?: {}".format(verified_status))
|
||||||
40
json_stuff.txt
Normal file
40
json_stuff.txt
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
{
|
||||||
|
"type": "result",
|
||||||
|
"timestamp": "2021-06-23T22:50:21Z",
|
||||||
|
"ping": {
|
||||||
|
"jitter": 0.23300000000000001,
|
||||||
|
"latency": 13.122999999999999
|
||||||
|
},
|
||||||
|
"download": {
|
||||||
|
"bandwidth": 11050354,
|
||||||
|
"bytes": 86103584,
|
||||||
|
"elapsed": 7801
|
||||||
|
},
|
||||||
|
"upload": {
|
||||||
|
"bandwidth": 4113077,
|
||||||
|
"bytes": 14873760,
|
||||||
|
"elapsed": 3614
|
||||||
|
},
|
||||||
|
"packetLoss": 0,
|
||||||
|
"isp": "Deutsche Telekom AG",
|
||||||
|
"interface": {
|
||||||
|
"internalIp": "192.168.178.60",
|
||||||
|
"name": "eth0",
|
||||||
|
"macAddr": "DC:A6:32:5D:54:50",
|
||||||
|
"isVpn": false,
|
||||||
|
"externalIp": "91.19.224.10"
|
||||||
|
},
|
||||||
|
"server": {
|
||||||
|
"id": 36896,
|
||||||
|
"name": "AlphaCron",
|
||||||
|
"location": "Apfelstädt",
|
||||||
|
"country": "Germany",
|
||||||
|
"host": "speed.alphacron.de",
|
||||||
|
"port": 8080,
|
||||||
|
"ip": "134.97.32.23"
|
||||||
|
},
|
||||||
|
"result": {
|
||||||
|
"id": "d9e6c6d6-55c5-4359-9bd0-a48190ed27e9",
|
||||||
|
"url": "https://www.speedtest.net/result/c/d9e6c6d6-55c5-4359-9bd0-a48190ed27e9"
|
||||||
|
}
|
||||||
|
}
|
||||||
119
ob_save.py
Normal file
119
ob_save.py
Normal file
@@ -0,0 +1,119 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import json
|
||||||
|
import gnupg
|
||||||
|
|
||||||
|
""" Now rewrite all this doodoo into a proper class """
|
||||||
|
|
||||||
|
gnupg_homedir = "./gnupg_home/"
|
||||||
|
keyring_file = "pubring.kbx"
|
||||||
|
json_filename = 'json_stuff.txt'
|
||||||
|
|
||||||
|
gpg = gnupg.GPG(gnupghome=gnupg_homedir, keyring=gnupg_homedir+keyring_file)
|
||||||
|
|
||||||
|
|
||||||
|
def get_fingerprint_by_email(email):
|
||||||
|
for key in gpg.list_keys():
|
||||||
|
if email in key["uids"][0]:
|
||||||
|
return key["fingerprint"]
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_keyid_by_email(email):
|
||||||
|
for key in gpg.list_keys():
|
||||||
|
if email in key["uids"][0]:
|
||||||
|
return key["keyid"]
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def read_gpg_file(filename: str, verify: bool = None) -> dict:
|
||||||
|
"""
|
||||||
|
Opposite function of write_gpg_file()
|
||||||
|
splits message, signature and hash from input file.
|
||||||
|
reads file line by line and uses a nasty state machine looking for magic strings to determine what goes where.
|
||||||
|
|
||||||
|
:param filename: path to file to be read, usually ends with .pgp
|
||||||
|
:param verify: Should
|
||||||
|
:return: dict -> {"json": str, "pgp_signature": str, "hash": str, "verified": bool}
|
||||||
|
|
||||||
|
json = json payload form file as string, can be loaded to dict with json.loads(read_gpg_file(filename)["json"])
|
||||||
|
pgp_signature = string representation of signature, use this to verify signature.
|
||||||
|
hash = hash function used. Might be useful, might not.
|
||||||
|
verified = Whenever signature verification was successful
|
||||||
|
"""
|
||||||
|
# Magic strings to look for
|
||||||
|
gpg_msg_start = "-----BEGIN PGP SIGNED MESSAGE-----\n"
|
||||||
|
gpg_sig_start = "-----BEGIN PGP SIGNATURE-----\n"
|
||||||
|
gpg_sig_end = "-----END PGP SIGNATURE-----\n"
|
||||||
|
|
||||||
|
# Empty strings to prepare return.
|
||||||
|
complete_msg = ""
|
||||||
|
out_string = ""
|
||||||
|
hash = ""
|
||||||
|
out_signature = ""
|
||||||
|
verified = None
|
||||||
|
|
||||||
|
# Behold... the mighty state machine
|
||||||
|
msg_start_found = False
|
||||||
|
sig_start_found = False
|
||||||
|
with open(filename, "r") as f:
|
||||||
|
for line in f.readlines():
|
||||||
|
complete_msg += line
|
||||||
|
if not msg_start_found:
|
||||||
|
if line == gpg_msg_start:
|
||||||
|
msg_start_found = True
|
||||||
|
continue
|
||||||
|
if line.startswith("Hash"):
|
||||||
|
hash += line.split(" ")[1].rstrip() # Remove unused whitespace
|
||||||
|
continue
|
||||||
|
if line == gpg_sig_start:
|
||||||
|
sig_start_found = True
|
||||||
|
continue
|
||||||
|
if not sig_start_found:
|
||||||
|
out_string += line.lstrip().rstrip() # Remove unused whitespace
|
||||||
|
if sig_start_found and msg_start_found and line != gpg_sig_end:
|
||||||
|
out_signature += line.lstrip().rstrip() # Remove unused whitespace
|
||||||
|
if line == gpg_sig_end:
|
||||||
|
break
|
||||||
|
if verify:
|
||||||
|
verified = gpg.verify(complete_msg).status
|
||||||
|
else:
|
||||||
|
verified = None
|
||||||
|
return {"json": out_string, "pgp_signature": out_signature, "hash": hash, "verified": verified}
|
||||||
|
|
||||||
|
|
||||||
|
def write_gpg_file(payload_to_sign: dict, filename: str, fingerprint=None) -> bool:
|
||||||
|
"""
|
||||||
|
:param payload_to_sign: dictonary that should be signed and written to disk.
|
||||||
|
:param filename: target filename WITHOUT .gpg extention!
|
||||||
|
:param fingerprint: OPTIONAL fingerprint (string) (Not yet implemented)
|
||||||
|
:return: True if any data was written, else prints error message and returns False
|
||||||
|
"""
|
||||||
|
if type(payload_to_sign) != dict:
|
||||||
|
print("Could not write {}, invalid payload. \n"
|
||||||
|
"Ensure to only pass object type: \"dict\"".format(filename))
|
||||||
|
return False
|
||||||
|
|
||||||
|
with open(filename+".gpg", "w") as f:
|
||||||
|
json.dump(payload_to_sign, f)
|
||||||
|
with open(filename+".gpg", "rb") as f:
|
||||||
|
signed_string = gpg.sign_file(f, keyid=fingerprint)
|
||||||
|
print("Signing with {}".format(fingerprint))
|
||||||
|
with open(filename+".gpg", "w") as f:
|
||||||
|
if f.write(str(signed_string)):
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
print("Could not write {}, 0 bytes written!\n"
|
||||||
|
"This message should be impossible to reach, well done.".format(filename))
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def verify_gpg_json(payload: str, signature: str) -> bool:
|
||||||
|
"""
|
||||||
|
Placeholder function to verify if pgp signed json is valid.
|
||||||
|
:param payload: Complete pgp message with headers and signature
|
||||||
|
:param signature:
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
return True
|
||||||
|
|
||||||
Reference in New Issue
Block a user