#!/usr/bin/env python3 """ A little tool to encrypt/decrypt git secrets. Kinda like password-store, but more purpose specific and portable. It generally expects to work with directory structures as follows: foo/bar/secrets/plain: plaintext files /cipher: ciphertext files, with names corresponding to plaintext files Note: currently all plaintext/cipher files are at a single level, ie.: there cannot be any subdirectory within a /plain or /cipher directory. There are multiple secret 'roots' like this in hscloud, notably: - cluster/secrets - hswaw/kube/secrets In the future, some repository-based configuration might exist to specify these roots in a nicer way, possibly with different target keys per root. This tool a bit of a swiss army knife, and can be used in the following ways: - as a CLI tool to encrypt/decrypt files directly - as a library for its encryption/decryption methods, and for a SecretStore API, which allows for basic programmatic access to secrets, decrypting things if necessary - as a CLI tool to 'synchronize' a directory containing plain/cipher files, which means encrypting every new plaintext file (or new ciphertext file), and re-encrypting all files whose keys are different from the keys list defined in this file. """ import argparse import logging import os import sys import subprocess import tempfile # Keys that are to be used to encrypt all secret roots. keys = [ "63DFE737F078657CC8A51C00C29ADD73B3563D82", # q3k "482FF104C29294AD1CAF827BA43890A3DE74ECC7", # inf "F07205946C07EEB2041A72FBC60C64879534F768", # cz2 "0879F9FCA1C836677BB808C870FD60197E195C26", # implr ] # Currently, Patryk's GPG key is expired. This hacks around that by pretending # it's January 2021. # TODO(q3k/patryk): remove this once Patryk updates his key. systime = '20210101T000000' _logger_name = __name__ if _logger_name == '__main__': _logger_name = 'secretstore' logger = logging.getLogger(_logger_name) class CLIException(Exception): pass def encrypt(src, dst): cmd = [ 'gpg' , '--encrypt', '--faked-system-time', systime, '--trust-model', 'always', '--armor', '--batch', '--yes', '--output', dst, ] for k in keys: cmd.append('--recipient') cmd.append(k) cmd.append(src) subprocess.check_call(cmd) def decrypt(src, dst): cmd = ['gpg', '--decrypt', '--batch', '--yes', '--output', dst, src] # catch stdout to make this code less chatty. subprocess.check_output(cmd, stderr=subprocess.STDOUT) def _encryption_key_for_fingerprint(fp): """ Returns the encryption key ID for a given GPG fingerprint (eg. one from the 'keys' list. """ cmd = ['gpg', '-k', '--faked-system-time', systime, '--keyid-format', 'long', fp] res = subprocess.check_output(cmd).decode() # Sample output: # pub rsa4096/70FD60197E195C26 2014-02-22 [SC] [expires: 2021-02-05] # 0879F9FCA1C836677BB808C870FD60197E195C26 # uid [ultimate] Bartosz Stebel # uid [ultimate] Bartosz Stebel # sub rsa4096/E203C94E5CEBB3EF 2014-02-22 [E] [expires: 2021-02-05] # # We want to extract the 'sub' key with the [E] tag. for line in res.split('\n'): line = line.strip() if not line: continue parts = line.split() if len(parts) < 4: continue if parts[0] != 'sub': continue if not parts[3].startswith('[') or not parts[3].endswith(']'): continue usages = parts[3].strip('[]') if 'E' not in usages: continue # Okay, we found the encryption key. return parts[1].split('/')[1] raise Exception("Could not find encryption key ID for fingerprint {}".format(fp)) _encryption_keys_cache = None def encryption_keys(): """ Return all encryption keys associated with the keys array. """ global _encryption_keys_cache if _encryption_keys_cache is None: _encryption_keys_cache = [_encryption_key_for_fingerprint(fp) for fp in keys] return _encryption_keys_cache def encrypted_for(path): """ Return for which encryption keys is a given GPG ciphertext file encrypted. """ cmd = ['gpg', '--pinentry-mode', 'cancel', '--list-packets', path] res = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode() # Sample output: # gpg: encrypted with 4096-bit RSA key, ID E203C94E5CEBB3EF, created 2014-02-22 # "Bartosz Stebel " # gpg: encrypted with 2048-bit RSA key, ID 5C1B6B69E9F5EABE, created 2013-01-29 # "Piotr Dobrowolski " # gpg: encrypted with 2048-bit RSA key, ID 386E893E110BC55B, created 2012-01-10 # "Sergiusz Bazanski (Low Latency Consulting) " # gpg: public key decryption failed: Operation cancelled # gpg: decryption failed: No secret key # # off=0 ctb=85 tag=1 hlen=3 plen=268 # :pubkey enc packet: version 3, algo 1, keyid 386E893E110BC55B # data: [2047 bits] # # off=271 ctb=85 tag=1 hlen=3 plen=268 # :pubkey enc packet: version 3, algo 1, keyid 5C1B6B69E9F5EABE # data: [2048 bits] # # off=542 ctb=85 tag=1 hlen=3 plen=524 # :pubkey enc packet: version 3, algo 1, keyid E203C94E5CEBB3EF # data: [4095 bits] # # off=1069 ctb=d2 tag=18 hlen=2 plen=121 new-ctb # :encrypted data packet: # length: 121 # mdc_method: 2 keys = [] for line in res.split('\n'): line = line.strip() if not line: continue parts = line.split() if len(parts) < 9: continue if parts[:4] != [':pubkey', 'enc', 'packet:', 'version']: continue if parts[7] != 'keyid': continue keys.append(parts[8]) # Make unique. return list(set(keys)) class SyncAction: """ SyncAction is a possible action taken to synchronize some secrets. An action is some sort of side-effect bearing OS action (ie execution of script or file move, or...) that can also 'describe' that it's acting - ie, just return a human readable string of what it would be doing. These describe descriptions are used for dry-runs of the secretstore sync functionality. """ def describe(self): return "" def act(self): pass class SyncActionEncrypt: def __init__(self, src, dst, reason): self.src = src self.dst = dst self.reason = reason def describe(self): return f'Encrypting {os.path.split(self.src)[-1]} ({self.reason})' def act(self): return encrypt(self.src, self.dst) class SyncActionDecrypt: def __init__(self, src, dst, reason): self.src = src self.dst = dst self.reason = reason def describe(self): return f'Decrypting {os.path.split(self.src)[-1]} ({self.reason})' def act(self): return decrypt(self.src, self.dst) def sync(path: str, dry: bool): """Synchronize (decrypt and encrypt what's needed) a given secrets directory.""" # Turn the path into an absolute path just to make things safer. path = os.path.abspath(path) # Trim all trailing slashes to canonicalize. path = path.rstrip('/') plain_path = os.path.join(path, "plain") cipher_path = os.path.join(path, "cipher") # Ensure that at least one of the plain/cipher paths exist. plain_exists = os.path.exists(plain_path) cipher_exists = os.path.exists(cipher_path) if not plain_exists and not cipher_exists: raise CLIException('Given directory must contain a plain/ or cipher/ subdirectory.') # Make missing directories. if not plain_exists: os.mkdir(plain_path) if not cipher_exists: os.mkdir(cipher_path) # List files on both sides: plain_files = [f for f in os.listdir(plain_path) if f != '.gitignore' and os.path.isfile(os.path.join(plain_path, f))] cipher_files = [f for f in os.listdir(cipher_path) if os.path.isfile(os.path.join(cipher_path, f))] # Helper function to turn a short filename within a directory to a pair # of plain/cipher full paths. def pc(p): return os.path.join(plain_path, p), os.path.join(cipher_path, p) # Make a set of all file names - no matter if only available as plain, as # cipher, or as both. all_files = set(plain_files + cipher_files) # We'll be making a list of actions to perform to bring up given directory # pair to a stable state. actions = [] # type: List[SyncAction] # First, for every possible file (either encrypted or decrypted), figure # out which side is fresher based on file presence and mtime. fresher = {} # type: Dict[str, str] for p in all_files: # Handle the easy case when the file only exists on one side. if p not in cipher_files: fresher[p] = 'plain' continue if p not in plain_files: fresher[p] = 'cipher' continue plain, cipher = pc(p) # Otherwise, we have both the cipher and plain version. # Check if the decrypted version matches the plaintext version. If so, # they're both equal. f = tempfile.NamedTemporaryFile(delete=False) f.close() decrypt(cipher, f.name) with open(f.name, 'rb') as fd: decrypted_data = fd.read() with open(plain, 'rb') as fc: current_data = fc.read() if decrypted_data == current_data: fresher[p] = 'equal' os.unlink(f.name) continue os.unlink(f.name) # The plain and cipher versions differ. Let's choose based on mtime. mtime_plain = os.path.getmtime(plain) mtime_cipher = os.path.getmtime(cipher) if mtime_plain > mtime_cipher: fresher[p] = 'plain' elif mtime_cipher > mtime_plain: fresher[p] = 'cipher' else: raise CLIException(f'cipher/plain stalemate on {p}: contents differ, but files have same mtime') # Find all files that need to be re-encrypted for changed keys. reencrypt = set() for p in cipher_files: _, cipher = pc(p) current = set(encrypted_for(cipher)) want = set(encryption_keys()) if current != want: reencrypt.add(p) # Okay, now actually construct a list of actions. # First, all fresh==cipher keys need to be decrypted. for p, v in fresher.items(): if v != 'cipher': continue plain, cipher = pc(p) actions.append(SyncActionDecrypt(cipher, plain, "cipher version is newer")) encrypted = set() # Then, encrypt all fresh==plain files, and make note of what those # are. for p, v in fresher.items(): if v != 'plain': continue plain, cipher = pc(p) actions.append(SyncActionEncrypt(plain, cipher, "plain version is newer")) encrypted.add(p) # Finally, re-encrypt all the files that aren't already being encrypted. for p in reencrypt.difference(encrypted): plain, cipher = pc(p) actions.append(SyncActionEncrypt(plain, cipher, "needs to be re-encrypted for different keys")) if len(actions) == 0: logger.info('Nothing to do!') else: if dry: logger.info('Would perform the following:') else: logger.info('Running actions...') for a in actions: logger.info(a.describe()) if not dry: a.act() class SecretStoreMissing(Exception): pass class SecretStore(object): def __init__(self, plain_root, cipher_root): self.proot = plain_root self.croot = cipher_root def exists(self, suffix): p = os.path.join(self.proot, suffix) c = os.path.join(self.croot, suffix) return os.path.exists(c) or os.path.exists(p) def plaintext(self, suffix): p = os.path.join(self.proot, suffix) c = os.path.join(self.croot, suffix) has_p = os.path.exists(p) has_c = os.path.exists(c) if has_c and has_p and os.path.getctime(p) < os.path.getctime(c): logger.info("Decrypting {} ({})...".format(suffix, c)) decrypt(c, p) return p def open(self, suffix, mode, *a, **kw): p = os.path.join(self.proot, suffix) c = os.path.join(self.croot, suffix) if 'w' in mode: return open(p, mode, *a, **kw) if not self.exists(suffix): raise SecretStoreMissing("Secret {} does not exist".format(suffix)) if not os.path.exists(p) or os.path.getctime(p) < os.path.getctime(c): logger.info("Decrypting {} ({})...".format(suffix, c)) decrypt(c, p) return open(p, mode, *a, **kw) def main(): parser = argparse.ArgumentParser(description='Manage hscloud git-based secrets.') subparsers = parser.add_subparsers(dest='mode') parser_decrypt = subparsers.add_parser('decrypt', help='decrypt a single secret file') parser_decrypt.add_argument('input', type=str, help='encrypted file path') parser_decrypt.add_argument('output', type=str, default='-', help='decrypted file path file path (or - for stdout)') parser_encrypt = subparsers.add_parser('encrypt', help='encrypt a single secret file') parser_encrypt.add_argument('input', type=str, help='plaintext file path') parser_encrypt.add_argument('output', type=str, default='-', help='encrypted file path file path (or - for stdout)') parser_sync = subparsers.add_parser('sync', help='Synchronize a canonically formatted secrets/{plain,cipher} directory') parser_sync.add_argument('dir', type=str, help='Path to secrets directory to synchronize') parser_sync.add_argument('--dry', dest='dry', action='store_true') parser_sync.set_defaults(dry=False) logging.basicConfig(level='INFO') args = parser.parse_args() if args.mode == None: parser.print_help() sys.exit(1) try: if args.mode == 'encrypt': encrypt(args.input, args.output) elif args.mode == 'decrypt': decrypt(args.input, args.output) elif args.mode == 'sync': sync(args.dir, dry=args.dry) else: # ??? raise Exception('invalid mode {}'.format(args.mode)) except CLIException as e: logger.error(e) sys.exit(1) if __name__ == '__main__': sys.exit(main() or 0)