#!/usr/bin/env python3 ''' bling.py - extract keys from macOS keychains. installation: pip install pytz hexdump vivisect-vstruct-wb tabulate argparse pycryptodome usage: python bling.py /path/to/keychain-db ./path/to/output/directory references: - https://repo.zenk-security.com/Forensic/Keychain%20Analysis%20with%20Mac%20OS%20X%20Memory%20Forensics.pdf - https://github.com/libyal/dtformats/blob/master/documentation/MacOS%20keychain%20database%20file%20format.asciidoc author: Willi Ballenthin email: william.ballenthin@fireeye.com license: Apache 2.0 ''' # TODO: detect invalid password import os import os.path import sys import copy import string import hashlib import logging import binascii import datetime import itertools from pprint import pprint import pytz import hexdump import vstruct from vstruct.primitives import * import tabulate import argparse # from pycryptodome from Crypto.Cipher import DES3 from Crypto.Util.Padding import unpad import Crypto.Protocol.KDF logger = logging.getLogger('osx.bling') class v_greedy_bytes(v_bytes): ''' a v_bytes byte array that consumes to the end of the given buffer. ''' def vsParse(self, fbytes, offset=0): self._vs_value = fbytes[offset:] return len(fbytes) class RECORD_HEADER(vstruct.VStruct): def __init__(self, attrs): vstruct.VStruct.__init__(self) self.RecordSize = v_uint32(bigend=True) self.RecordNumber = v_uint32(bigend=True) self.unk1 = v_uint32(bigend=True) self.unk2 = v_uint32(bigend=True) self.BlobSize = v_uint32(bigend=True) self.zero = v_uint32(bigend=True) # offset 0x18 self.AttributeOffsets = vstruct.VArray([v_uint32(bigend=True) for _ in range(len(attrs))]) self.blob_data_offset = 0x18 + (4 * len(attrs)) self.BlobData = v_bytes(size=0) self.attribute_data_offset = self.blob_data_offset self.AttributeData = v_bytes(size=0) def pcb_BlobSize(self): self['BlobData'].vsSetLength(int(self.BlobSize)) self['AttributeData'].vsSetLength(int(self.RecordSize) - self.blob_data_offset - int(self.BlobSize)) self.attribute_data_offset = self.blob_data_offset + int(self.BlobSize) class Record: def __init__(self, schema, buf): ''' Args: schema: from `Keychain.get_table_schema()`. buf (bytes): buffer to parse for a record. ''' # # diagram: # # +-----------+-----------+-----------+-----------+ # | rec size rec index unk1 unk2 | # +-----------+-----------+-----------+-----------+ # | blob size 0x0 | attribute offsets | <-- attributes are declared in schema, # +-----------+-----------+ | record structure based on table. # | | # +-----------+-----------+-----------+-----------+ # | blob data (parsed into "blob") | \ # | | > blob size # | | / # +-----------+-----------+-----------+-----------+ # | attribute data | # | | # | | # +-----------+-----------+-----------+-----------+ <-- rec size # self.buf = buf # this is the generic header, contains record size, record number, etc. self.header = RECORD_HEADER(schema['attrs']) #print(len(schema['attrs'])) #hexdump.hexdump(buf[:0x100]) self.header.vsParse(buf) self.attrs = {} for i, attr_desc in enumerate(schema['attrs']): attr_offset = int(self.header.AttributeOffsets[i]) if attr_offset != 0: # offset == 0 signals the attribute is empty # so shift offsets by 1. # # this offset is relative to the start of the record. attr_offset = attr_offset - 1 attr_buf = buf[attr_offset:] attr = ATTRIBUTE_PARSERS[int(attr_desc['AttributeFormat'])]() attr.vsParse(attr_buf) self.attrs[str(attr_desc['AttributeName'])] = attr self.blob = BLOB_PARSERS[int(schema['RelationID'])]() self.blob.vsParse(self.header.BlobData) CSSM_DL_DB = v_enum() # Schema Management CSSM_DL_DB.SCHEMA_INFO = 0x00000000 # Schema information CSSM_DL_DB.SCHEMA_INDEXES = 0x00000001 # Schema indexes CSSM_DL_DB.SCHEMA_ATTRIBUTES = 0x00000002 # Schema attributes CSSM_DL_DB.SCHEMA_PARSING_MODULE = 0x00000003 # Schema parsing module # Open Group Application CSSM_DL_DB.RECORD_ANY = 0x0000000A # Temporary table type. CSSM_DL_DB.RECORD_CERT = 0x0000000B # Certificates CSSM_DL_DB.RECORD_CRL = 0x0000000C # Certificate Revocation List CSSM_DL_DB.RECORD_POLICY = 0x0000000D # Policy CSSM_DL_DB.RECORD_GENERIC = 0x0000000E # Generic information CSSM_DL_DB.RECORD_PUBLIC_KEY = 0x0000000F # Public key CSSM_DL_DB.RECORD_PRIVATE_KEY = 0x00000010 # Private key CSSM_DL_DB.RECORD_SYMMETRIC_KEY = 0x00000011 # Symmetric key CSSM_DL_DB.RECORD_ALL_KEY = 0x00000012 # Temporary table type # Industry at Large Applications CSSM_DL_DB.RECORD_GENERIC_PASSWORD = 0x80000000 # User credential CSSM_DL_DB.RECORD_INTERNET_PASSWORD = 0x80000001 # User credential on the Internet in particular CSSM_DL_DB.RECORD_APPLESHARE_PASSWORD = 0x80000002 # (Depreciated) CSSM_DL_DB.RECORD_USER_TRUST = 0x80000003 # User-defined certificates CSSM_DL_DB.RECORD_X509_CRL = 0x80000004 # X.509 Certificate Revocation List CSSM_DL_DB.RECORD_UNLOCK_REFERRAL = 0x80000005 # Unlock referral CSSM_DL_DB.RECORD_EXTENDED_ATTRIBUTE = 0x80000006 # Extended attribute for database management CSSM_DL_DB.RECORD_X509_CERTIFICATE = 0x80001000 # X.509 Certificates CSSM_DL_DB.RECORD_METADATA = 0x80008000 # Metadata information class EMPTY_BLOB(vstruct.VStruct): def __init__(self): vstruct.VStruct.__init__(self) def decrypt(self, keychain): return {} class COMMON_BLOB(vstruct.VStruct): def __init__(self): vstruct.VStruct.__init__(self) self.Magic = v_uint32(bigend=True) self.BlobVersion = v_uint32(bigend=True) def pcb_Magic(self): if self.Magic != 0xfade0711: raise ValueError('invalid COMMON_BLOB magic') class DB_PARAMETERS(vstruct.VStruct): def __init__(self): vstruct.VStruct.__init__(self) self.IdleTimeout = v_uint32(bigend=True) # uint32 self.LockOnSleep = v_uint32(bigend=True) # uint8 class DB_BLOB(vstruct.VStruct): def __init__(self): vstruct.VStruct.__init__(self) self.CommonBlob = COMMON_BLOB() self.StartCryptoBlob = v_uint32(bigend=True) self.TotalLength = v_uint32(bigend=True) self.RandomSignature = v_bytes(size=0x10) self.Sequence = v_uint32(bigend=True) self.Params = DB_PARAMETERS() self.Salt = v_bytes(size=0x14) self.IV = v_bytes(size=8) self.BlobSignature = v_bytes(size=0x14) self.unk2 = vstruct.VArray([v_uint32(bigend=True) for _ in range(7)]) self.EncryptedDBKey = v_bytes(size=0x30) def decrypt(self, keychain): # magic: number of rounds = 1000 # magic: key size = 24 master_key = Crypto.Protocol.KDF.PBKDF2(keychain.password, self.Salt, count=1000, dkLen=24) des3 = DES3.new(master_key, DES3.MODE_CBC, self.IV) # pkcs#7 padding, 3DES block size (8 bytes) # magic: size of key = 24 bytes db_key = unpad(des3.decrypt(self.EncryptedDBKey), 8)[:24] return { 'master_key': master_key, 'db_key': db_key, 'plaintext': db_key, } class SSGP(vstruct.VStruct): def __init__(self): vstruct.VStruct.__init__(self) self.Magic = v_bytes(size=4) self.Label = v_bytes(size=0x10) def pcb_Magic(self): if self.Magic != b'ssgp': raise ValueError('invalid SSGP header') def parse_ssgp_label(label): ''' parse a buffer into an SSGP label and return the id. ''' ssgp = SSGP() ssgp.vsParse(label.data) return ssgp.Label class SYMMETRIC_KEY_BLOB(vstruct.VStruct): def __init__(self): vstruct.VStruct.__init__(self) self.CommonBlob = COMMON_BLOB() self.StartCryptoBlob = v_uint32(bigend=True) self.TotalLength = v_uint32(bigend=True) self.IV = v_bytes(size=8) self.Padding = v_bytes(size=0) self.EncryptedKey = v_bytes(size=0) def pcb_StartCryptoBlob(self): self['Padding'].vsSetLength(int(self.StartCryptoBlob) - 0x18) def pcb_TotalLength(self): self['EncryptedKey'].vsSetLength(int(self.TotalLength) - int(self.StartCryptoBlob)) def decrypt(self, keychain): des3a = DES3.new(keychain.db_key, DES3.MODE_CBC, binascii.unhexlify('4adda22c79e82105')) p1 = unpad(des3a.decrypt(self.EncryptedKey), 8) des3b = DES3.new(keychain.db_key, DES3.MODE_CBC, self.IV) # the ciphertext is the first 32 bytes, reversed p2 = unpad(des3b.decrypt(p1[:0x20][::-1]), 8) # example plaintext: # # 00000000: 00 00 00 00 C1 3D 0F F9 CB AC 6D AC D6 40 3A 98 .....=....m..@:. # 00000010: 4B 3C 5C F4 E8 12 F0 3E CB 31 83 6C K<\....>.1.l if len(p2) != 0x1C: raise ValueError('unexpected plaintext length') if p2[:4] != b'\x00\x00\x00\x00': raise ValueError('unexpected plaintext header') return { 'plaintext': p2[4:], } class GENERIC_PASSWORD_BLOB(vstruct.VStruct): def __init__(self): vstruct.VStruct.__init__(self) self.SSGP = SSGP() self.IV = v_bytes(size=0x8) self.EncryptedKey = v_greedy_bytes() def decrypt(self, keychain): keyid = self.SSGP.Label key = keychain.get_symmetric_key(keyid) if self.EncryptedKey: des3 = DES3.new(key, DES3.MODE_CBC, self.IV) plaintext = unpad(des3.decrypt(self.EncryptedKey), 8) return { 'plaintext': plaintext } else: # its possible for there to be no encrypted key, # e.g. the BlobSize is 0x1C, which only leaves space for: # SSGP magic # SSGP label # IV # some entries for `Microsoft Office Identities Cache 3` look like this. # TODO: figure out how to interpret this. return {} class INTERNET_PASSWORD_BLOB(vstruct.VStruct): def __init__(self): vstruct.VStruct.__init__(self) self.SSGP = SSGP() self.IV = v_bytes(size=0x8) self.EncryptedKey = v_greedy_bytes() def decrypt(self, keychain): keyid = self.SSGP.Label key = keychain.get_symmetric_key(keyid) des3 = DES3.new(key, DES3.MODE_CBC, self.IV) plaintext = unpad(des3.decrypt(self.EncryptedKey), 8) return { 'plaintext': plaintext } class PUBLIC_KEY_BLOB(vstruct.VStruct): def __init__(self): vstruct.VStruct.__init__(self) self.CommonBlob = COMMON_BLOB() self.StartCryptoBlob = v_uint32(bigend=True) self.TotalLength = v_uint32(bigend=True) self.Padding = v_bytes(size=0) self.PublicKey = v_bytes(size=0) def pcb_StartCryptoBlob(self): # 0x10 = sizeof(CommonBlob + StartCryptoBlob + TotalLength) self['Padding'].vsSetLength(int(self.StartCryptoBlob) - 0x10) def pcb_TotalLength(self): self['PublicKey'].vsSetLength(int(self.TotalLength) - int(self.StartCryptoBlob)) def decrypt(self, keychain): return { 'plaintext': self.PublicKey, } class PRIVATE_KEY_BLOB(vstruct.VStruct): def __init__(self): vstruct.VStruct.__init__(self) self.CommonBlob = COMMON_BLOB() self.StartCryptoBlob = v_uint32(bigend=True) self.TotalLength = v_uint32(bigend=True) self.IV = v_bytes(size=8) self.Padding = v_bytes(size=0) self.EncryptedKey = v_bytes(size=0) def pcb_StartCryptoBlob(self): self['Padding'].vsSetLength(int(self.StartCryptoBlob) - 0x18) def pcb_TotalLength(self): self['EncryptedKey'].vsSetLength(int(self.TotalLength) - int(self.StartCryptoBlob)) def decrypt(self, keychain): des3a = DES3.new(keychain.db_key, DES3.MODE_CBC, binascii.unhexlify('4adda22c79e82105')) p1 = unpad(des3a.decrypt(self.EncryptedKey), 8) des3b = DES3.new(keychain.db_key, DES3.MODE_CBC, self.IV) # the ciphertext is the first 32 bytes, reversed p2 = unpad(des3b.decrypt(p1[::-1]), 8) return { 'plaintext': p2, } class X509_CERTIFICATE_BLOB(vstruct.VStruct): def __init__(self): vstruct.VStruct.__init__(self) self.Certificate = v_greedy_bytes() def decrypt(self, keychain): return { 'plaintext': self.Certificate, } BLOB_PARSERS = { # scheam structure is stored in attributes. CSSM_DL_DB.SCHEMA_INFO: EMPTY_BLOB, CSSM_DL_DB.SCHEMA_INDEXES: EMPTY_BLOB, CSSM_DL_DB.SCHEMA_ATTRIBUTES: EMPTY_BLOB, CSSM_DL_DB.SCHEMA_PARSING_MODULE: EMPTY_BLOB, CSSM_DL_DB.RECORD_ANY: NotImplemented, CSSM_DL_DB.RECORD_CERT: NotImplemented, CSSM_DL_DB.RECORD_CRL: NotImplemented, CSSM_DL_DB.RECORD_POLICY: NotImplemented, CSSM_DL_DB.RECORD_GENERIC: NotImplemented, CSSM_DL_DB.RECORD_PUBLIC_KEY: PUBLIC_KEY_BLOB, CSSM_DL_DB.RECORD_PRIVATE_KEY: PRIVATE_KEY_BLOB, CSSM_DL_DB.RECORD_SYMMETRIC_KEY: SYMMETRIC_KEY_BLOB, CSSM_DL_DB.RECORD_ALL_KEY: NotImplemented, CSSM_DL_DB.RECORD_GENERIC_PASSWORD: GENERIC_PASSWORD_BLOB, CSSM_DL_DB.RECORD_INTERNET_PASSWORD: INTERNET_PASSWORD_BLOB, CSSM_DL_DB.RECORD_APPLESHARE_PASSWORD: NotImplemented, CSSM_DL_DB.RECORD_USER_TRUST: NotImplemented, CSSM_DL_DB.RECORD_X509_CRL: NotImplemented, CSSM_DL_DB.RECORD_UNLOCK_REFERRAL: NotImplemented, CSSM_DL_DB.RECORD_EXTENDED_ATTRIBUTE: NotImplemented, CSSM_DL_DB.RECORD_X509_CERTIFICATE: X509_CERTIFICATE_BLOB, CSSM_DL_DB.RECORD_METADATA: DB_BLOB, } CSSM_DB_ATTRIBUTE_FORMAT= v_enum() CSSM_DB_ATTRIBUTE_FORMAT.STRING = 0 CSSM_DB_ATTRIBUTE_FORMAT.SINT32 = 1 CSSM_DB_ATTRIBUTE_FORMAT.UINT32 = 2 CSSM_DB_ATTRIBUTE_FORMAT.BIG_NUM = 3 CSSM_DB_ATTRIBUTE_FORMAT.REAL = 4 CSSM_DB_ATTRIBUTE_FORMAT.TIME_DATE = 5 CSSM_DB_ATTRIBUTE_FORMAT.BLOB = 6 CSSM_DB_ATTRIBUTE_FORMAT.MULTI_UINT32 = 7 CSSM_DB_ATTRIBUTE_FORMAT.COMPLEX = 8 class STRING(vstruct.VStruct): def __init__(self): vstruct.VStruct.__init__(self) self.length = v_uint32(bigend=True) self.data = v_str() def pcb_length(self): self['data'].vsSetLength(int(self.length)) def __str__(self): return str(self.data).rstrip('\x00') def __repr__(self): return repr(self.data).rstrip('\x00') SINT32 = lambda: v_int32(bigend=True) UINT32 = lambda: v_uint32(bigend=True) BIG_NUM = NotImplemented REAL = lambda: v_double(bigend=True) class TIME_DATE(vstruct.VStruct): def __init__(self): vstruct.VStruct.__init__(self) self.data = v_bytes(size=0x10) self._ts = {} @property def ts(self): return self._ts['it'] def pcb_data(self): try: year = int(self.data[0:4]) month = int(self.data[4:6]) day = int(self.data[6:8]) hour = int(self.data[8:10]) min = int(self.data[10:12]) sec = int(self.data[12:14]) except ValueError: self._ts['it'] = datetime.datetime.min return z = self.data[14:16] if z == b'Z\x00': # TODO: set tz self._ts['it'] = datetime.datetime(year, month, day, hour, min, sec, tzinfo=pytz.utc) else: self._ts['it'] = datetime.datetime.min def __repr__(self): return self.ts.isoformat('T') + 'Z' def is_ascii(s): if sys.version_info[0] < 3: return all(c in string.printable for c in s) else: return all(chr(c) in string.printable for c in s) def is_printable(buf): try: s = buf.decode('utf-8').partition('\x00')[0].encode('ascii') except (UnicodeDecodeError, UnicodeEncodeError): return False else: return is_ascii(s) class BLOB(vstruct.VStruct): def __init__(self): vstruct.VStruct.__init__(self) self.length = v_uint32(bigend=True) self.data = v_bytes() def pcb_length(self): self['data'].vsSetLength(int(self.length)) def __str__(self): if is_printable(self.data): return self.data.decode('utf-8').partition('\x00')[0] else: return 'hex:' + binascii.hexlify(self.data).decode('ascii') def __repr__(self): return str(self) MULTI_UINT32 = NotImplemented COMPLEX = NotImplemented ATTRIBUTE_PARSERS = { CSSM_DB_ATTRIBUTE_FORMAT.STRING: STRING, CSSM_DB_ATTRIBUTE_FORMAT.SINT32: SINT32, CSSM_DB_ATTRIBUTE_FORMAT.UINT32: UINT32, CSSM_DB_ATTRIBUTE_FORMAT.BIG_NUM: BIG_NUM, CSSM_DB_ATTRIBUTE_FORMAT.REAL: REAL, CSSM_DB_ATTRIBUTE_FORMAT.TIME_DATE: TIME_DATE, CSSM_DB_ATTRIBUTE_FORMAT.BLOB: BLOB, CSSM_DB_ATTRIBUTE_FORMAT.MULTI_UINT32: MULTI_UINT32, CSSM_DB_ATTRIBUTE_FORMAT.COMPLEX: COMPLEX, } CSSM_KEYCLASS = v_enum() CSSM_KEYCLASS.PUBLIC_KEY = 0x00+0x0F CSSM_KEYCLASS.PRIVATE_KEY = 0x01+0x0F CSSM_KEYCLASS.SESSION_KEY = 0x02+0x0F CSSM_KEYCLASS.SECRET_PART = 0x03+0x0F CSSM_KEYCLASS.OTHER = 0xFFFFFFFF CSSM_ALGID = v_enum() CSSM_ALGID.NONE = 0 CSSM_ALGID.CUSTOM = 1 CSSM_ALGID.DH = 2 CSSM_ALGID.PH = 3 CSSM_ALGID.KEA = 4 CSSM_ALGID.MD2 = 5 CSSM_ALGID.MD4 = 6 CSSM_ALGID.MD5 = 7 CSSM_ALGID.SHA1 = 8 CSSM_ALGID.NHASH = 9 CSSM_ALGID.HAVAL = 10 CSSM_ALGID.RIPEMD = 11 CSSM_ALGID.IBCHASH = 12 CSSM_ALGID.RIPEMAC = 13 CSSM_ALGID.DES = 14 CSSM_ALGID.DESX = 15 CSSM_ALGID.RDES = 16 CSSM_ALGID.THREEDES_3KEY_EDE = 17 CSSM_ALGID.THREEDES_2KEY_EDE = 18 CSSM_ALGID.THREEDES_1KEY_EEE = 19 CSSM_ALGID.THREEDES_3KEY_EEE = 20 CSSM_ALGID.THREEDES_2KEY_EEE = 21 CSSM_ALGID.IDEA = 22 CSSM_ALGID.RC2 = 23 CSSM_ALGID.RC5 = 24 CSSM_ALGID.RC4 = 25 CSSM_ALGID.SEAL = 26 CSSM_ALGID.CAST = 27 CSSM_ALGID.BLOWFISH = 28 CSSM_ALGID.SKIPJACK = 29 CSSM_ALGID.LUCIFER = 30 CSSM_ALGID.MADRYGA = 31 CSSM_ALGID.FEAL = 32 CSSM_ALGID.REDOC = 33 CSSM_ALGID.REDOC3 = 34 CSSM_ALGID.LOKI = 35 CSSM_ALGID.KHUFU = 36 CSSM_ALGID.KHAFRE = 37 CSSM_ALGID.MMB = 38 CSSM_ALGID.GOST = 39 CSSM_ALGID.SAFER = 40 CSSM_ALGID.CRAB = 41 CSSM_ALGID.RSA = 42 CSSM_ALGID.DSA = 43 CSSM_ALGID.MD5WithRSA = 44 CSSM_ALGID.MD2WithRSA = 45 CSSM_ALGID.ElGamal = 46 CSSM_ALGID.MD2Random = 47 CSSM_ALGID.MD5Random = 48 CSSM_ALGID.SHARandom = 49 CSSM_ALGID.DESRandom = 50 CSSM_ALGID.SHA1WithRSA = 51 CSSM_ALGID.CDMF = 52 CSSM_ALGID.CAST3 = 53 CSSM_ALGID.CAST5 = 54 CSSM_ALGID.GenericSecret = 55 CSSM_ALGID.ConcatBaseAndKey = 56 CSSM_ALGID.ConcatKeyAndBase = 57 CSSM_ALGID.ConcatBaseAndData = 58 CSSM_ALGID.ConcatDataAndBase = 59 CSSM_ALGID.XORBaseAndData = 60 CSSM_ALGID.ExtractFromKey = 61 CSSM_ALGID.SSL3PreMasterGen = 62 CSSM_ALGID.SSL3MasterDerive = 63 CSSM_ALGID.SSL3KeyAndMacDerive = 64 CSSM_ALGID.SSL3MD5_MAC = 65 CSSM_ALGID.SSL3SHA1_MAC = 66 CSSM_ALGID.PKCS5_PBKDF1_MD5 = 67 CSSM_ALGID.PKCS5_PBKDF1_MD2 = 68 CSSM_ALGID.PKCS5_PBKDF1_SHA1 = 69 CSSM_ALGID.WrapLynks = 70 CSSM_ALGID.WrapSET_OAEP = 71 CSSM_ALGID.BATON = 72 CSSM_ALGID.ECDSA = 73 CSSM_ALGID.MAYFLY = 74 CSSM_ALGID.JUNIPER = 75 CSSM_ALGID.FASTHASH = 76 CSSM_ALGID.THREEDES = 77 CSSM_ALGID.SSL3MD5 = 78 CSSM_ALGID.SSL3SHA1 = 79 CSSM_ALGID.FortezzaTimestamp = 80 CSSM_ALGID.SHA1WithDSA = 81 CSSM_ALGID.SHA1WithECDSA = 82 CSSM_ALGID.DSA_BSAFE = 83 CSSM_ALGID.ECDH = 84 CSSM_ALGID.ECMQV = 85 CSSM_ALGID.PKCS12_SHA1_PBE = 86 CSSM_ALGID.ECNRA = 87 CSSM_ALGID.SHA1WithECNRA = 88 CSSM_ALGID.ECES = 89 CSSM_ALGID.ECAES = 90 CSSM_ALGID.SHA1HMAC = 91 CSSM_ALGID.FIPS186Random = 92 CSSM_ALGID.ECC = 93 CSSM_ALGID.MQV = 94 CSSM_ALGID.NRA = 95 CSSM_ALGID.IntelPlatformRandom = 96 CSSM_ALGID.UTC = 97 CSSM_ALGID.HAVAL3 = 98 CSSM_ALGID.HAVAL4 = 99 CSSM_ALGID.HAVAL5 = 100 CSSM_ALGID.TIGER = 101 CSSM_ALGID.MD5HMAC = 102 CSSM_ALGID.PKCS5_PBKDF2 = 103 CSSM_ALGID.RUNNING_COUNTER = 104 class TABLE_HEADER(vstruct.VStruct): def __init__(self): vstruct.VStruct.__init__(self) self.TableSize = v_uint32(bigend=True) self.TableId = v_uint32(bigend=True, enum=CSSM_DL_DB) # number of offset entries with LSB not set (valid offset) # (contrast to `TotalRowCount`) self.AllocatedRowCount = v_uint32(bigend=True) self.Records = v_uint32(bigend=True) self.IndexesOffset = v_uint32(bigend=True) self.FreeListHead = v_uint32(bigend=True) # total number of offset entries # (contrast to `AllocatedRowCount`) self.TotalRowCount = v_uint32(bigend=True) self.RecordOffsets = vstruct.VArray() def pcb_TotalRowCount(self): for _ in range(self.TotalRowCount): self.RecordOffsets.vsAddElement(v_uint32(bigend=True)) class Table: def __init__(self, db, buf): ''' Args: db (Database): the database that owns this table. buf (bytes): the data to parse for this table. ''' self.db = db self.buf = buf self.header = TABLE_HEADER() self.header.vsParse(buf) def get_records(self): logger.debug('get_records for %s, %d rows total, %d rows allocated', CSSM_DL_DB.vsReverseMapping(int(self.header.TableId)), self.header.TotalRowCount, self.header.AllocatedRowCount) schema = self.db.get_table_schema(int(self.header.TableId)) for i in range(self.header.TotalRowCount): record_offset = int(self.header.RecordOffsets[i]) if record_offset & 0b1 > 0: # if LSB is set, then record is invalid/unallocated continue if record_offset == 0x0: continue record_length = struct.unpack('>I', self.buf[record_offset:record_offset+4].tobytes())[0] if record_length == 0x0: continue record_buf = self.buf[record_offset:record_offset+record_length] if isinstance(self.buf, memoryview): record_buf = self.buf[record_offset:record_offset+record_length].tobytes() try: record = Record(schema, record_buf) except ValueError as e: logger.warning('failed to parse record (table: %s, record: %s): %s', self.header.TableId, i, e) continue yield record class APPL_DB_SCHEMA(vstruct.VStruct): def __init__(self): vstruct.VStruct.__init__(self) self.SchemaSize = v_uint32(bigend=True) self.TableCount = v_uint32(bigend=True) self.TableOffsets = vstruct.VArray() def pcb_TableCount(self): for _ in range(self.TableCount): self.TableOffsets.vsAddElement(v_uint32(bigend=True)) # via: http://mirror.informatimago.com/next/developer.apple.com/documentation/Security/Reference/keychainservices/Reference/reference.html SecItemAttr = v_enum() SecItemAttr.CreationDate = struct.unpack('>I', struct.pack('>4s', b'cdat'))[0] SecItemAttr.ModDate = struct.unpack('>I', struct.pack('>4s', b'mdat'))[0] SecItemAttr.Description = struct.unpack('>I', struct.pack('>4s', b'desc'))[0] SecItemAttr.Comment = struct.unpack('>I', struct.pack('>4s', b'icmt'))[0] SecItemAttr.Creator = struct.unpack('>I', struct.pack('>4s', b'crtr'))[0] SecItemAttr.Type = struct.unpack('>I', struct.pack('>4s', b'type'))[0] SecItemAttr.ScriptCode = struct.unpack('>I', struct.pack('>4s', b'scrp'))[0] SecItemAttr.Label = struct.unpack('>I', struct.pack('>4s', b'labl'))[0] SecItemAttr.Invisible = struct.unpack('>I', struct.pack('>4s', b'invi'))[0] SecItemAttr.Negative = struct.unpack('>I', struct.pack('>4s', b'nega'))[0] SecItemAttr.CustomIcon = struct.unpack('>I', struct.pack('>4s', b'cusi'))[0] SecItemAttr.Account = struct.unpack('>I', struct.pack('>4s', b'acct'))[0] SecItemAttr.Service = struct.unpack('>I', struct.pack('>4s', b'svce'))[0] SecItemAttr.Generic = struct.unpack('>I', struct.pack('>4s', b'gena'))[0] SecItemAttr.SecurityDomain = struct.unpack('>I', struct.pack('>4s', b'sdmn'))[0] SecItemAttr.Server = struct.unpack('>I', struct.pack('>4s', b'srvr'))[0] SecItemAttr.AuthenticationType = struct.unpack('>I', struct.pack('>4s', b'atyp'))[0] SecItemAttr.Port = struct.unpack('>I', struct.pack('>4s', b'port'))[0] SecItemAttr.Path = struct.unpack('>I', struct.pack('>4s', b'path'))[0] SecItemAttr.Volume = struct.unpack('>I', struct.pack('>4s', b'vlme'))[0] SecItemAttr.Address = struct.unpack('>I', struct.pack('>4s', b'addr'))[0] SecItemAttr.Signature = struct.unpack('>I', struct.pack('>4s', b'ssig'))[0] SecItemAttr.Protocol = struct.unpack('>I', struct.pack('>4s', b'ptcl'))[0] SecItemAttr.CertificateType = struct.unpack('>I', struct.pack('>4s', b'ctyp'))[0] SecItemAttr.CertificateEncoding = struct.unpack('>I', struct.pack('>4s', b'cenc'))[0] SecItemAttr.CrlType = struct.unpack('>I', struct.pack('>4s', b'crtp'))[0] SecItemAttr.CrlEncoding = struct.unpack('>I', struct.pack('>4s', b'crnc'))[0] SecItemAttr.Alias = struct.unpack('>I', struct.pack('>4s', b'alis'))[0] CSSM_CERT = v_enum() CSSM_CERT.UNKNOWN = 0x00 CSSM_CERT.X_509v1 = 0x01 CSSM_CERT.X_509v2 = 0x02 CSSM_CERT.X_509v3 = 0x03 CSSM_CERT.PGP = 0x04 CSSM_CERT.SPKI = 0x05 CSSM_CERT.SDSIv1 = 0x06 CSSM_CERT.Intel = 0x08 CSSM_CERT.X_509_ATTRIBUTE = 0x09 CSSM_CERT.X9_ATTRIBUTE = 0x0A CSSM_CERT.ACL_ENTRY = 0x0C CSSM_CERT.MULTIPLE = 0x7FFE CSSM_CERT.LAST = 0x7FFF CSSM_CERT.CUSTOM = 0x8000 CSSM_CERT_ENCODING = v_enum() CSSM_CERT_ENCODING.UNKNOWN = 0x00 CSSM_CERT_ENCODING.CUSTOM = 0x01 CSSM_CERT_ENCODING.BER = 0x02 CSSM_CERT_ENCODING.DER = 0x03 CSSM_CERT_ENCODING.NDR = 0x04 CSSM_CERT_ENCODING.SEXPR = 0x05 CSSM_CERT_ENCODING.PGP = 0x06 CSSM_CERT_ENCODING.MULTIPLE = 0x7FFE CSSM_CERT_ENCODING.LAST = 0x7FFF SecAuthenticationType = v_enum() SecAuthenticationType.NTLM = b'ntlm' SecAuthenticationType.MSN = b'msna' SecAuthenticationType.DPA = b'dpaa' SecAuthenticationType.RPA = b'rpaa' SecAuthenticationType.HTTPBasic = b'http' SecAuthenticationType.HTTPDigest = b'httd' SecAuthenticationType.HTMLForm = b'form' SecAuthenticationType.Default = b'dflt' SecAuthenticationType.Any = b'\x00\x00\x00\x00' SecProtocolType = v_enum() SecProtocolType.FTP = struct.unpack('>I', struct.pack('>4s', b'ftp '))[0] SecProtocolType.FTPAccount = struct.unpack('>I', struct.pack('>4s', b'ftpa'))[0] SecProtocolType.HTTP = struct.unpack('>I', struct.pack('>4s', b'http'))[0] SecProtocolType.IRC = struct.unpack('>I', struct.pack('>4s', b'irc '))[0] SecProtocolType.NNTP = struct.unpack('>I', struct.pack('>4s', b'nntp'))[0] SecProtocolType.POP3 = struct.unpack('>I', struct.pack('>4s', b'pop3'))[0] SecProtocolType.SMTP = struct.unpack('>I', struct.pack('>4s', b'smtp'))[0] SecProtocolType.SOCKS = struct.unpack('>I', struct.pack('>4s', b'sox '))[0] SecProtocolType.IMAP = struct.unpack('>I', struct.pack('>4s', b'imap'))[0] SecProtocolType.LDAP = struct.unpack('>I', struct.pack('>4s', b'ldap'))[0] SecProtocolType.AppleTalk = struct.unpack('>I', struct.pack('>4s', b'atlk'))[0] SecProtocolType.AFP = struct.unpack('>I', struct.pack('>4s', b'afp '))[0] SecProtocolType.Telnet = struct.unpack('>I', struct.pack('>4s', b'teln'))[0] SecProtocolType.SSH = struct.unpack('>I', struct.pack('>4s', b'ssh '))[0] SecProtocolType.FTPS = struct.unpack('>I', struct.pack('>4s', b'ftps'))[0] SecProtocolType.HTTPS = struct.unpack('>I', struct.pack('>4s', b'htps'))[0] SecProtocolType.HTTPProxy = struct.unpack('>I', struct.pack('>4s', b'htpx'))[0] SecProtocolType.HTTPSProxy = struct.unpack('>I', struct.pack('>4s', b'htsx'))[0] SecProtocolType.FTPProxy = struct.unpack('>I', struct.pack('>4s', b'ftpx'))[0] SecProtocolType.CIFS = struct.unpack('>I', struct.pack('>4s', b'cifs'))[0] SecProtocolType.SMB = struct.unpack('>I', struct.pack('>4s', b'smb '))[0] SecProtocolType.RTSP = struct.unpack('>I', struct.pack('>4s', b'rtsp'))[0] SecProtocolType.RTSPProxy = struct.unpack('>I', struct.pack('>4s', b'rtsx'))[0] SecProtocolType.DAAP = struct.unpack('>I', struct.pack('>4s', b'daap'))[0] SecProtocolType.EPPC = struct.unpack('>I', struct.pack('>4s', b'eppc'))[0] SecProtocolType.IPP = struct.unpack('>I', struct.pack('>4s', b'ipp '))[0] SecProtocolType.NNTPS = struct.unpack('>I', struct.pack('>4s', b'ntps'))[0] SecProtocolType.LDAPS = struct.unpack('>I', struct.pack('>4s', b'ldps'))[0] SecProtocolType.TelnetS = struct.unpack('>I', struct.pack('>4s', b'tels'))[0] SecProtocolType.IMAPS = struct.unpack('>I', struct.pack('>4s', b'imps'))[0] SecProtocolType.IRCS = struct.unpack('>I', struct.pack('>4s', b'ircs'))[0] SecProtocolType.POP3S = struct.unpack('>I', struct.pack('>4s', b'pops'))[0] SecProtocolType.CVSpserver = struct.unpack('>I', struct.pack('>4s', b'cvsp'))[0] SecProtocolType.CVSpserver = struct.unpack('>I', struct.pack('>4s', b'svn '))[0] SecProtocolType.AdiumMessenger = struct.unpack('>I', struct.pack('>4s', b'AdIM'))[0] SecProtocolType.Any = struct.unpack('>I', struct.pack('>4s', b'\x00\x00\x00\x00'))[0] class Database: def __init__(self, buf): self.buf = buf self.schema = APPL_DB_SCHEMA() self.schema.vsParse(buf) tables = [ self._get_table_by_index(i) for i in range(self.schema.TableCount) ] self.tables = {} for table in tables: if table.header.TableId in self.tables: raise ValueError("dupliate tables with id: " + hex(table.header.TableId)) self.tables[table.header.TableId] = table def _get_table_by_index(self, index): table_offset = self.schema.TableOffsets[index] table_buf = self.buf[table_offset:] return Table(self, table_buf) def select(self, table, record_index=None, limit=sys.maxsize): table = self.tables[table] for r in itertools.islice(table.get_records(), min(limit, table.header.TotalRowCount)): yield r def get_table_schema(self, table): # the schema is self-describing, so you can inspect the schema itself. # we provide hardcoded definitions of these tables to bootstrap the schema. logger.debug('fetching schema for 0x%x', table) if table == CSSM_DL_DB.SCHEMA_INFO: return {'RelationID': CSSM_DL_DB.SCHEMA_INFO, 'RelationName': 'CSSM_DL_DB_SCHEMA_INFO', 'attrs': [{'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.UINT32, 'AttributeID': 0, 'AttributeName': 'RelationID', 'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING, 'RelationID': CSSM_DL_DB.SCHEMA_INFO}, {'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING, 'AttributeID': 1, 'AttributeName': 'RelationName', 'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING, 'RelationID': CSSM_DL_DB.SCHEMA_INFO}]} elif table == CSSM_DL_DB.SCHEMA_ATTRIBUTES: return {'RelationID': CSSM_DL_DB.SCHEMA_ATTRIBUTES, 'RelationName': 'CSSM_DL_DB_SCHEMA_ATTRIBUTES', 'attrs': [{'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.UINT32, 'AttributeID': 0, 'AttributeName': 'RelationID', 'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING, 'RelationID': CSSM_DL_DB.SCHEMA_ATTRIBUTES}, {'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.UINT32, 'AttributeID': 1, 'AttributeName': 'AttributeID', 'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING, 'RelationID': CSSM_DL_DB.SCHEMA_ATTRIBUTES}, {'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.UINT32, 'AttributeID': 2, 'AttributeName': 'AttributeNameFormat', 'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING, 'RelationID': CSSM_DL_DB.SCHEMA_ATTRIBUTES}, {'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING, 'AttributeID': 3, 'AttributeName': 'AttributeName', 'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING, 'RelationID': CSSM_DL_DB.SCHEMA_ATTRIBUTES}, {'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.BLOB, 'AttributeID': 4, 'AttributeName': 'AttributeNameID', 'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING, 'RelationID': CSSM_DL_DB.SCHEMA_ATTRIBUTES}, {'AttributeFormat': CSSM_DB_ATTRIBUTE_FORMAT.UINT32, 'AttributeID': 5, 'AttributeName': 'AttributeFormat', 'AttributeNameFormat': CSSM_DB_ATTRIBUTE_FORMAT.STRING, 'RelationID': CSSM_DL_DB.SCHEMA_ATTRIBUTES}]} for r in self.select(CSSM_DL_DB.SCHEMA_INFO): if int(r.attrs['RelationID']) != table: continue attrs = [] for a in self.select(CSSM_DL_DB.SCHEMA_ATTRIBUTES): if int(a.attrs['RelationID']) != int(r.attrs['RelationID']): continue if 'AttributeName' not in a.attrs: a.attrs['AttributeName'] = SecItemAttr.vsReverseMapping(int(a.attrs['AttributeID']), default='Unknown') else: a.attrs['AttributeName'] = str(a.attrs['AttributeName']) attrs.append(a.attrs) ret = copy.copy(r.attrs) ret['attrs'] = attrs return ret raise KeyError('failed to find table') class APPL_DB_HEADER(vstruct.VStruct): def __init__(self): vstruct.VStruct.__init__(self) self.Signature = v_bytes(size=4) self.MajorVersion = v_uint16() self.MinorVersion = v_uint16() self.HeaderSize = v_uint32(bigend=True) self.SchemaOffset = v_uint32(bigend=True) self.AuthOffset = v_uint32(bigend=True) def pcb_Signature(self): if self.Signature != b'kych': raise ValueError('invalid header signature') def pcb_Version(self): if self.Version != 0x100: raise ValueError('unsupported version') class Keychain: def __init__(self, buf, password): self.buf = buf self.password = password self.header = APPL_DB_HEADER() self.header.vsParse(buf) self.db = Database(buf[self.header.SchemaOffset:]) keys = self.get_master_keys() self.master_key = keys['master_key'] self.db_key = keys['db_key'] self.symmetric_keys = { parse_ssgp_label(key['attrs']['Label']): key['plaintext'] for key in self.get_symmetric_keys() } def get_decrypted_rows(self, table): for record in self.db.select(table): key = { 'attrs': copy.copy(record.attrs), } key.update(record.blob.decrypt(self)) yield key def get_master_keys(self): # index zero seems to be a magic constant. return next(self.get_decrypted_rows(CSSM_DL_DB.RECORD_METADATA)) def get_symmetric_keys(self): for key in self.get_decrypted_rows(CSSM_DL_DB.RECORD_SYMMETRIC_KEY): yield key def get_symmetric_key(self, keyid): return self.symmetric_keys[keyid] # these are the names of attributes that should be rendered as a boolean (true/false) # this list is collected empirically, not from any database metadata. BOOL_ATTRIBUTES = { 'Permanent', 'Private', 'Modifiable', 'Sensitive', 'AlwaysSensitive', 'Extractable', 'NeverExtractable', 'Encrypt', 'Decrypt', 'Derive', 'Sign', 'Verify', 'SignRecover', 'VerifyRecover', 'Wrap', 'Unwrap', 'Invisible', } def render_cell(attr_name, attr_value): if attr_value == '': return '' elif attr_name in BOOL_ATTRIBUTES: if bool(int(attr_value)): return 'true' else: return 'false' elif attr_name == 'KeyClass': return CSSM_KEYCLASS.vsReverseMapping(int(attr_value)) elif attr_name == 'KeyType': return CSSM_ALGID.vsReverseMapping(int(attr_value)) elif attr_name == 'CertType': return CSSM_CERT.vsReverseMapping(int(attr_value)) elif attr_name == 'CertEncoding': return CSSM_CERT_ENCODING.vsReverseMapping(int(attr_value)) elif attr_name == 'AuthenticationType': return SecAuthenticationType.vsReverseMapping(attr_value.data) elif attr_name == 'Protocol': return SecProtocolType.vsReverseMapping(int(attr_value)) elif attr_name == 'Port': return str(attr_value) elif isinstance(attr_value, v_number): return hex(attr_value).rstrip('L') else: return str(attr_value) def render_plaintext(outdir, plaintext): if is_printable(plaintext) and len(plaintext) < 64: return plaintext.decode('ascii') else: md5 = hashlib.md5() md5.update(plaintext) outpath = os.path.join(outdir, 'binary', md5.hexdigest()) logger.debug('writing binary blob to file %s', outpath) with open(outpath, 'wb') as f: f.write(plaintext) return 'file://' + os.path.join('binary', md5.hexdigest()) def render_table(keychain, table, outdir): logger.debug('rendering table %s', CSSM_DL_DB.vsReverseMapping(int(table))) schema = keychain.db.get_table_schema(table) rows = [] has_plaintext = any(map(lambda r: 'plaintext' in r, keychain.get_decrypted_rows(table))) for i, row in enumerate(keychain.get_decrypted_rows(table)): logger.debug('table %s row %d', CSSM_DL_DB.vsReverseMapping(int(table)), i) cells = [render_cell(attr['AttributeName'], row['attrs'].get(attr['AttributeName'], '')) for attr in schema['attrs']] if has_plaintext: cells.append(render_plaintext(outdir, row.get('plaintext', b''))) rows.append(cells) headers = [attr['AttributeName'] for attr in schema['attrs']] if has_plaintext: headers.append('plaintext') return tabulate.tabulate( rows, headers=headers, ) def write_keychain_report(keychain, outdir): binary_dir = os.path.join(outdir, 'binary') logger.info('writing binary blobs into directory %s', binary_dir) if not os.path.exists(binary_dir): os.makedirs(binary_dir) report_path = os.path.join(outdir, 'report.txt') logger.info('writing report into file %s', report_path) with open(report_path, 'wb') as f: for tableid in sorted(keychain.db.tables.keys()): try: f.write(('%s TABLE %s %s\n' % ('#' * 20, CSSM_DL_DB.vsReverseMapping(int(tableid)), '#' * 20)).encode('utf-8')) f.write(render_table(keychain, tableid, outdir).encode('utf-8')) f.write('\n'.encode('utf-8')) except TypeError: logger.warn('table not supported: %s (submit to Willi for testing)' % (CSSM_DL_DB.vsReverseMapping(int(tableid)))) except Exception as e: logger.warn('failed to render table %s: %s' % (CSSM_DL_DB.vsReverseMapping(int(tableid)), e)) return None def main(argv=None): if argv is None: argv = sys.argv[1:] parser = argparse.ArgumentParser(description="extract keys from macOS keychains") parser.add_argument("keychain", type=str, help="Path to input keychain file") parser.add_argument("password", type=str, help="Keychain password") parser.add_argument("output_directory", type=str, help="Path into which to write binary data") parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug logging") parser.add_argument("-q", "--quiet", action="store_true", help="Disable all output but errors") args = parser.parse_args(args=argv) if args.verbose: logging.basicConfig(level=logging.DEBUG) logging.getLogger().setLevel(logging.DEBUG) elif args.quiet: logging.basicConfig(level=logging.ERROR) logging.getLogger().setLevel(logging.ERROR) else: logging.basicConfig(level=logging.INFO) logging.getLogger().setLevel(logging.INFO) if not os.path.exists(args.output_directory): os.makedirs(args.output_directory) with open(args.keychain, 'rb') as f: buf = memoryview(f.read()) keychain = Keychain(buf, args.password) write_keychain_report(keychain, args.output_directory) return 0 if __name__ == "__main__": sys.exit(main())