mirror of
https://gerrit.hackerspace.pl/hscloud
synced 2024-10-15 06:07:45 +00:00
Serge Bazanski
f97c9688d5
We also set --trust-model=always, as we explicitly ship GPG fingerprints, so there's no need to rely on GPG's web of trust shenanigans. Change-Id: If2976130315c044f1d1727c61a6f6d489c876a52
451 lines
14 KiB
Python
451 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
|
|
"""
|
|
A little tool to encrypt/decrypt git secrets. Kinda like password-store, but
|
|
more purpose specific and portable.
|
|
|
|
It generally expects to work with directory structures as follows:
|
|
|
|
foo/bar/secrets/plain: plaintext files
|
|
/cipher: ciphertext files, with names corresponding to
|
|
plaintext files
|
|
|
|
Note: currently all plaintext/cipher files are at a single level, ie.: there
|
|
cannot be any subdirectory within a /plain or /cipher directory.
|
|
|
|
There are multiple secret 'roots' like this in hscloud, notably:
|
|
|
|
- cluster/secrets
|
|
- hswaw/kube/secrets
|
|
|
|
In the future, some repository-based configuration might exist to specify these
|
|
roots in a nicer way, possibly with different target keys per root.
|
|
|
|
This tool a bit of a swiss army knife, and can be used in the following ways:
|
|
|
|
- as a CLI tool to encrypt/decrypt files directly
|
|
- as a library for its encryption/decryption methods, and for a SecretStore
|
|
API, which allows for basic programmatic access to secrets, decrypting
|
|
things if necessary
|
|
- as a CLI tool to 'synchronize' a directory containing plain/cipher files,
|
|
which means encrypting every new plaintext file (or new ciphertext file),
|
|
and re-encrypting all files whose keys are different from the keys list
|
|
defined in this file.
|
|
|
|
"""
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import sys
|
|
import subprocess
|
|
import tempfile
|
|
|
|
# Keys that are to be used to encrypt all secret roots.
|
|
keys = [
|
|
"63DFE737F078657CC8A51C00C29ADD73B3563D82", # q3k
|
|
"482FF104C29294AD1CAF827BA43890A3DE74ECC7", # inf
|
|
"F07205946C07EEB2041A72FBC60C64879534F768", # cz2
|
|
"0879F9FCA1C836677BB808C870FD60197E195C26", # implr
|
|
]
|
|
|
|
# Currently, Patryk's GPG key is expired. This hacks around that by pretending
|
|
# it's January 2021.
|
|
# TODO(q3k/patryk): remove this once Patryk updates his key.
|
|
systime = '20210101T000000'
|
|
|
|
_logger_name = __name__
|
|
if _logger_name == '__main__':
|
|
_logger_name = 'secretstore'
|
|
logger = logging.getLogger(_logger_name)
|
|
|
|
|
|
class CLIException(Exception):
|
|
pass
|
|
|
|
|
|
def encrypt(src, dst):
|
|
cmd = [
|
|
'gpg' ,
|
|
'--encrypt',
|
|
'--faked-system-time', systime,
|
|
'--trust-model', 'always',
|
|
'--armor',
|
|
'--batch', '--yes',
|
|
'--output', dst,
|
|
]
|
|
for k in keys:
|
|
cmd.append('--recipient')
|
|
cmd.append(k)
|
|
cmd.append(src)
|
|
subprocess.check_call(cmd)
|
|
|
|
|
|
def decrypt(src, dst):
|
|
cmd = ['gpg', '--decrypt', '--batch', '--yes', '--output', dst, src]
|
|
# catch stdout to make this code less chatty.
|
|
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
|
|
|
|
def _encryption_key_for_fingerprint(fp):
|
|
"""
|
|
Returns the encryption key ID for a given GPG fingerprint (eg. one from the
|
|
'keys' list.
|
|
"""
|
|
cmd = ['gpg', '-k', '--faked-system-time', systime, '--keyid-format', 'long', fp]
|
|
res = subprocess.check_output(cmd).decode()
|
|
|
|
# Sample output:
|
|
# pub rsa4096/70FD60197E195C26 2014-02-22 [SC] [expires: 2021-02-05]
|
|
# 0879F9FCA1C836677BB808C870FD60197E195C26
|
|
# uid [ultimate] Bartosz Stebel <bartoszstebel@gmail.com>
|
|
# uid [ultimate] Bartosz Stebel <implr@hackerspace.pl>
|
|
# sub rsa4096/E203C94E5CEBB3EF 2014-02-22 [E] [expires: 2021-02-05]
|
|
#
|
|
# We want to extract the 'sub' key with the [E] tag.
|
|
for line in res.split('\n'):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
parts = line.split()
|
|
if len(parts) < 4:
|
|
continue
|
|
if parts[0] != 'sub':
|
|
continue
|
|
|
|
if not parts[3].startswith('[') or not parts[3].endswith(']'):
|
|
continue
|
|
usages = parts[3].strip('[]')
|
|
if 'E' not in usages:
|
|
continue
|
|
|
|
# Okay, we found the encryption key.
|
|
return parts[1].split('/')[1]
|
|
|
|
raise Exception("Could not find encryption key ID for fingerprint {}".format(fp))
|
|
|
|
|
|
_encryption_keys_cache = None
|
|
def encryption_keys():
|
|
"""
|
|
Return all encryption keys associated with the keys array.
|
|
"""
|
|
global _encryption_keys_cache
|
|
if _encryption_keys_cache is None:
|
|
_encryption_keys_cache = [_encryption_key_for_fingerprint(fp) for fp in keys]
|
|
|
|
return _encryption_keys_cache
|
|
|
|
|
|
def encrypted_for(path):
|
|
"""
|
|
Return for which encryption keys is a given GPG ciphertext file encrypted.
|
|
"""
|
|
cmd = ['gpg', '--pinentry-mode', 'cancel', '--list-packets', path]
|
|
res = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode()
|
|
|
|
# Sample output:
|
|
# gpg: encrypted with 4096-bit RSA key, ID E203C94E5CEBB3EF, created 2014-02-22
|
|
# "Bartosz Stebel <bartoszstebel@gmail.com>"
|
|
# gpg: encrypted with 2048-bit RSA key, ID 5C1B6B69E9F5EABE, created 2013-01-29
|
|
# "Piotr Dobrowolski <piotr.tytus.dobrowolski@gmail.com>"
|
|
# gpg: encrypted with 2048-bit RSA key, ID 386E893E110BC55B, created 2012-01-10
|
|
# "Sergiusz Bazanski (Low Latency Consulting) <serge@lowlatency.ie>"
|
|
# gpg: public key decryption failed: Operation cancelled
|
|
# gpg: decryption failed: No secret key
|
|
# # off=0 ctb=85 tag=1 hlen=3 plen=268
|
|
# :pubkey enc packet: version 3, algo 1, keyid 386E893E110BC55B
|
|
# data: [2047 bits]
|
|
# # off=271 ctb=85 tag=1 hlen=3 plen=268
|
|
# :pubkey enc packet: version 3, algo 1, keyid 5C1B6B69E9F5EABE
|
|
# data: [2048 bits]
|
|
# # off=542 ctb=85 tag=1 hlen=3 plen=524
|
|
# :pubkey enc packet: version 3, algo 1, keyid E203C94E5CEBB3EF
|
|
# data: [4095 bits]
|
|
# # off=1069 ctb=d2 tag=18 hlen=2 plen=121 new-ctb
|
|
# :encrypted data packet:
|
|
# length: 121
|
|
# mdc_method: 2
|
|
|
|
keys = []
|
|
for line in res.split('\n'):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
parts = line.split()
|
|
if len(parts) < 9:
|
|
continue
|
|
|
|
|
|
if parts[:4] != [':pubkey', 'enc', 'packet:', 'version']:
|
|
continue
|
|
|
|
if parts[7] != 'keyid':
|
|
continue
|
|
|
|
keys.append(parts[8])
|
|
|
|
# Make unique.
|
|
return list(set(keys))
|
|
|
|
|
|
class SyncAction:
|
|
"""
|
|
SyncAction is a possible action taken to synchronize some secrets.
|
|
|
|
An action is some sort of side-effect bearing OS action (ie execution of
|
|
script or file move, or...) that can also 'describe' that it's acting - ie,
|
|
just return a human readable string of what it would be doing. These
|
|
describe descriptions are used for dry-runs of the secretstore sync
|
|
functionality.
|
|
"""
|
|
def describe(self):
|
|
return ""
|
|
|
|
def act(self):
|
|
pass
|
|
|
|
class SyncActionEncrypt:
|
|
def __init__(self, src, dst, reason):
|
|
self.src = src
|
|
self.dst = dst
|
|
self.reason = reason
|
|
|
|
def describe(self):
|
|
return f'Encrypting {os.path.split(self.src)[-1]} ({self.reason})'
|
|
|
|
def act(self):
|
|
return encrypt(self.src, self.dst)
|
|
|
|
|
|
class SyncActionDecrypt:
|
|
def __init__(self, src, dst, reason):
|
|
self.src = src
|
|
self.dst = dst
|
|
self.reason = reason
|
|
|
|
def describe(self):
|
|
return f'Decrypting {os.path.split(self.src)[-1]} ({self.reason})'
|
|
|
|
def act(self):
|
|
return decrypt(self.src, self.dst)
|
|
|
|
|
|
def sync(path: str, dry: bool):
|
|
"""Synchronize (decrypt and encrypt what's needed) a given secrets directory."""
|
|
|
|
# Turn the path into an absolute path just to make things safer.
|
|
path = os.path.abspath(path)
|
|
# Trim all trailing slashes to canonicalize.
|
|
path = path.rstrip('/')
|
|
|
|
plain_path = os.path.join(path, "plain")
|
|
cipher_path = os.path.join(path, "cipher")
|
|
|
|
# Ensure that at least one of the plain/cipher paths exist.
|
|
plain_exists = os.path.exists(plain_path)
|
|
cipher_exists = os.path.exists(cipher_path)
|
|
if not plain_exists and not cipher_exists:
|
|
raise CLIException('Given directory must contain a plain/ or cipher/ subdirectory.')
|
|
|
|
# Make missing directories.
|
|
if not plain_exists:
|
|
os.mkdir(plain_path)
|
|
if not cipher_exists:
|
|
os.mkdir(cipher_path)
|
|
|
|
# List files on both sides:
|
|
plain_files = [f for f in os.listdir(plain_path) if f != '.gitignore' and os.path.isfile(os.path.join(plain_path, f))]
|
|
cipher_files = [f for f in os.listdir(cipher_path) if os.path.isfile(os.path.join(cipher_path, f))]
|
|
|
|
# Helper function to turn a short filename within a directory to a pair
|
|
# of plain/cipher full paths.
|
|
def pc(p):
|
|
return os.path.join(plain_path, p), os.path.join(cipher_path, p)
|
|
|
|
# Make a set of all file names - no matter if only available as plain, as
|
|
# cipher, or as both.
|
|
all_files = set(plain_files + cipher_files)
|
|
|
|
# We'll be making a list of actions to perform to bring up given directory
|
|
# pair to a stable state.
|
|
actions = [] # type: List[SyncAction]
|
|
|
|
# First, for every possible file (either encrypted or decrypted), figure
|
|
# out which side is fresher based on file presence and mtime.
|
|
fresher = {} # type: Dict[str, str]
|
|
for p in all_files:
|
|
# Handle the easy case when the file only exists on one side.
|
|
if p not in cipher_files:
|
|
fresher[p] = 'plain'
|
|
continue
|
|
if p not in plain_files:
|
|
fresher[p] = 'cipher'
|
|
continue
|
|
|
|
plain, cipher = pc(p)
|
|
|
|
# Otherwise, we have both the cipher and plain version.
|
|
# Check if the decrypted version matches the plaintext version. If so,
|
|
# they're both equal.
|
|
|
|
f = tempfile.NamedTemporaryFile(delete=False)
|
|
f.close()
|
|
decrypt(cipher, f.name)
|
|
|
|
with open(f.name, 'rb') as fd:
|
|
decrypted_data = fd.read()
|
|
with open(plain, 'rb') as fc:
|
|
current_data = fc.read()
|
|
|
|
if decrypted_data == current_data:
|
|
fresher[p] = 'equal'
|
|
os.unlink(f.name)
|
|
continue
|
|
|
|
os.unlink(f.name)
|
|
|
|
# The plain and cipher versions differ. Let's choose based on mtime.
|
|
mtime_plain = os.path.getmtime(plain)
|
|
mtime_cipher = os.path.getmtime(cipher)
|
|
|
|
if mtime_plain > mtime_cipher:
|
|
fresher[p] = 'plain'
|
|
elif mtime_cipher > mtime_plain:
|
|
fresher[p] = 'cipher'
|
|
else:
|
|
raise CLIException(f'cipher/plain stalemate on {p}: contents differ, but files have same mtime')
|
|
|
|
# Find all files that need to be re-encrypted for changed keys.
|
|
reencrypt = set()
|
|
for p in cipher_files:
|
|
_, cipher = pc(p)
|
|
current = set(encrypted_for(cipher))
|
|
want = set(encryption_keys())
|
|
|
|
if current != want:
|
|
reencrypt.add(p)
|
|
|
|
# Okay, now actually construct a list of actions.
|
|
# First, all fresh==cipher keys need to be decrypted.
|
|
for p, v in fresher.items():
|
|
if v != 'cipher':
|
|
continue
|
|
|
|
plain, cipher = pc(p)
|
|
actions.append(SyncActionDecrypt(cipher, plain, "cipher version is newer"))
|
|
|
|
encrypted = set()
|
|
# Then, encrypt all fresh==plain files, and make note of what those
|
|
# are.
|
|
for p, v in fresher.items():
|
|
if v != 'plain':
|
|
continue
|
|
|
|
plain, cipher = pc(p)
|
|
actions.append(SyncActionEncrypt(plain, cipher, "plain version is newer"))
|
|
encrypted.add(p)
|
|
|
|
# Finally, re-encrypt all the files that aren't already being encrypted.
|
|
for p in reencrypt.difference(encrypted):
|
|
plain, cipher = pc(p)
|
|
actions.append(SyncActionEncrypt(plain, cipher, "needs to be re-encrypted for different keys"))
|
|
|
|
if len(actions) == 0:
|
|
logger.info('Nothing to do!')
|
|
else:
|
|
if dry:
|
|
logger.info('Would perform the following:')
|
|
else:
|
|
logger.info('Running actions...')
|
|
for a in actions:
|
|
logger.info(a.describe())
|
|
if not dry:
|
|
a.act()
|
|
|
|
|
|
class SecretStoreMissing(Exception):
|
|
pass
|
|
|
|
|
|
class SecretStore(object):
|
|
def __init__(self, plain_root, cipher_root):
|
|
self.proot = plain_root
|
|
self.croot = cipher_root
|
|
|
|
def exists(self, suffix):
|
|
p = os.path.join(self.proot, suffix)
|
|
c = os.path.join(self.croot, suffix)
|
|
return os.path.exists(c) or os.path.exists(p)
|
|
|
|
def plaintext(self, suffix):
|
|
p = os.path.join(self.proot, suffix)
|
|
c = os.path.join(self.croot, suffix)
|
|
|
|
has_p = os.path.exists(p)
|
|
has_c = os.path.exists(c)
|
|
|
|
if has_c and has_p and os.path.getctime(p) < os.path.getctime(c):
|
|
logger.info("Decrypting {} ({})...".format(suffix, c))
|
|
decrypt(c, p)
|
|
|
|
return p
|
|
|
|
def open(self, suffix, mode, *a, **kw):
|
|
p = os.path.join(self.proot, suffix)
|
|
c = os.path.join(self.croot, suffix)
|
|
if 'w' in mode:
|
|
return open(p, mode, *a, **kw)
|
|
|
|
if not self.exists(suffix):
|
|
raise SecretStoreMissing("Secret {} does not exist".format(suffix))
|
|
|
|
if not os.path.exists(p) or os.path.getctime(p) < os.path.getctime(c):
|
|
logger.info("Decrypting {} ({})...".format(suffix, c))
|
|
decrypt(c, p)
|
|
|
|
return open(p, mode, *a, **kw)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Manage hscloud git-based secrets.')
|
|
subparsers = parser.add_subparsers(dest='mode')
|
|
|
|
parser_decrypt = subparsers.add_parser('decrypt', help='decrypt a single secret file')
|
|
parser_decrypt.add_argument('input', type=str, help='encrypted file path')
|
|
parser_decrypt.add_argument('output', type=str, default='-', help='decrypted file path file path (or - for stdout)')
|
|
|
|
parser_encrypt = subparsers.add_parser('encrypt', help='encrypt a single secret file')
|
|
parser_encrypt.add_argument('input', type=str, help='plaintext file path')
|
|
parser_encrypt.add_argument('output', type=str, default='-', help='encrypted file path file path (or - for stdout)')
|
|
|
|
parser_sync = subparsers.add_parser('sync', help='Synchronize a canonically formatted secrets/{plain,cipher} directory')
|
|
parser_sync.add_argument('dir', type=str, help='Path to secrets directory to synchronize')
|
|
parser_sync.add_argument('--dry', dest='dry', action='store_true')
|
|
parser_sync.set_defaults(dry=False)
|
|
|
|
logging.basicConfig(level='INFO')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.mode == None:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
try:
|
|
if args.mode == 'encrypt':
|
|
encrypt(args.input, args.output)
|
|
elif args.mode == 'decrypt':
|
|
decrypt(args.input, args.output)
|
|
elif args.mode == 'sync':
|
|
sync(args.dir, dry=args.dry)
|
|
else:
|
|
# ???
|
|
raise Exception('invalid mode {}'.format(args.mode))
|
|
except CLIException as e:
|
|
logger.error(e)
|
|
sys.exit(1)
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main() or 0)
|