Last active
May 17, 2023 12:26
-
-
Save lixian/e3ab20080c9b34a89bb91bc12b413e53 to your computer and use it in GitHub Desktop.
pixiu to beancount
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import io | |
| import re | |
| import pandas as pd | |
| from pypinyin import pinyin, Style | |
| use_pinyin = False | |
| def n(*words): | |
| return ":".join([get_account_word(word) for word in words]) | |
| def get_account_word(word): | |
| trim = word.replace('+', '').replace(' ', '').replace('/', '').strip() | |
| global use_pinyin | |
| if use_pinyin: | |
| trim = to_pinyin(trim) | |
| return trim | |
| # Function to convert Chinese characters to Pinyin | |
| def to_pinyin(chinese_chars): | |
| pinyin_chars = ''.join([i[0].capitalize() for i in pinyin(chinese_chars, style=Style.NORMAL)]) | |
| pinyin_chars = re.sub('[^a-zA-Z0-9-]', '', pinyin_chars) | |
| return pinyin_chars | |
| currency_map = { | |
| '人民币': 'CNY', | |
| '美元': 'USD', | |
| '港币': 'HKD', | |
| # Add other currency mappings here... | |
| } | |
| def convert_to_beancount(df, args): | |
| beancount_data = [] | |
| opened_accounts = [] | |
| # Reverse the DataFrame to process transactions in ascending order by date | |
| df = df.iloc[::-1] | |
| for index, row in df.iterrows(): | |
| date = row['日期'] | |
| cat1 = row['交易分类'] | |
| cat2 = row['收支大类'] | |
| cat3 = row['交易类型'] | |
| if str(row['备注']) != "nan": | |
| description = f"{cat1} {cat2} {cat3}: {row['备注']}" | |
| else: | |
| description = f"{cat1} {cat2} {cat3}" | |
| currency = row['币种'] | |
| if currency in currency_map: | |
| currency = currency_map[currency] | |
| amount_in = row['流入金额'] | |
| amount_out = row['流出金额'] | |
| amount = max(amount_in, amount_out) | |
| account_from, account_to = fetch_accounts(row, amount_in, amount_out, cat1, cat2, cat3) | |
| # Open the accounts if they have not been opened yet | |
| open_account(account_from, beancount_data, currency, date, opened_accounts) | |
| open_account(account_to, beancount_data, currency, date, opened_accounts) | |
| beancount_data.append(f"{date} * \"{description}\"") | |
| beancount_data.append(f" {account_from} -{amount} {currency}") | |
| beancount_data.append(f" {account_to} {amount} {currency}") | |
| return '\n'.join(beancount_data) | |
| def fetch_accounts(row, amount_in, amount_out, cat1, cat2, cat3): | |
| # For transfer transactions | |
| if '→' in row['资金账户']: | |
| account_from, account_to = row['资金账户'].split('→') | |
| account_from, account_to = n("Assets", account_from), n("Assets", account_to) | |
| # For other transactions | |
| else: | |
| if row['资金账户'] == "无": | |
| account = "Equity:None" | |
| else: | |
| account = n("Assets", row['资金账户']) | |
| account_from, account_to = account, account | |
| if cat1 == "日常收入": | |
| if cat2 == "无": | |
| cat2 = cat3 | |
| account_from, account_to = n("Income", cat1, cat2, cat3), account | |
| elif cat1 == "日常支出": | |
| if cat2 == "无": | |
| cat2 = cat3 | |
| account_from, account_to = account, n("Expenses", cat1, cat2, cat3) | |
| elif cat1 == "余额调整": | |
| if cat2 == "系统收入": | |
| if cat3 == "余额调整": | |
| account_from, account_to = n("Equity", cat1, cat2, cat3), account | |
| else: | |
| account_from, account_to = n("Income", cat1, cat2, cat3), account | |
| else: # expenses | |
| if cat3 == "余额调整": | |
| account_from, account_to = account, n("Equity", cat1, cat2, cat3) | |
| else: | |
| account_from, account_to = account, n("Expenses", cat1, cat2, cat3) | |
| elif cat1 == "预付" or cat1 == "借出" or cat1 == "应收款": | |
| account_from = account | |
| match = re.search(r'「(.*?)」', cat3) | |
| if match: | |
| name = match.group(1) | |
| account_to = n("Assets", name) | |
| else: | |
| print("error: no matched account") | |
| elif cat1 == "借入": | |
| account_to = account | |
| match = re.search(r'「(.*?)」', cat3) | |
| if match: | |
| name = match.group(1) | |
| account_from = n("Liabilities", name) | |
| # account_from = n("Equity", name) | |
| else: | |
| print("error: no matched account") | |
| elif cat1 == "证券买入": | |
| account_from = account | |
| account_to = n("Assets", "证券") | |
| elif cat1 == "资产购入": | |
| account_from = account | |
| account_to = n("Assets", "房产") | |
| # account_to = n("Equity", "房产") | |
| elif cat1 == "贵金属买入": | |
| account_from = account | |
| account_to = n("Assets", "贵金属") | |
| elif amount_in > 0: | |
| account_from, account_to = 'Income:Miscellaneous', account | |
| print(row) | |
| elif amount_out > 0: | |
| account_from, account_to = account, 'Expenses:Miscellaneous' | |
| print(row) | |
| else: | |
| # print("no amount ", row) | |
| pass | |
| return account_from, account_to | |
| def open_account(account, beancount_data, currency, date, opened_accounts): | |
| if account not in opened_accounts: | |
| beancount_data.append(f"{date} open {account}") | |
| opened_accounts.append(account) | |
| # Save the Beancount data to a file | |
| def save_to_beancount_file(beancount_data, file_name): | |
| with open(file_name, 'w') as f: | |
| f.write("""option "title" "My Personal Ledger" | |
| option "operating_currency" "CNY" | |
| option "render_commas" "True" | |
| 2023-05-01 custom "fava-option" "language" "en" | |
| 2023-05-01 price USD 6.99 CNY | |
| 2023-05-01 price HKD 0.89 CNY | |
| """) | |
| f.write(beancount_data) | |
| f.write("\n") | |
| # main function to coordinate the process | |
| def main(): | |
| # Create the parser | |
| parser = argparse.ArgumentParser(description="A simple script") | |
| # Add the arguments | |
| parser.add_argument('-p', '--pinyin', action='store_true', help="Enable pinyin mode") | |
| # Parse the arguments | |
| args = parser.parse_args() | |
| if args.pinyin: | |
| global use_pinyin | |
| use_pinyin = True | |
| print("Pinyin mode is enabled") | |
| csv_file = '/Users/loki/iCloud/Documents/貔貅/貔貅记账#所有交易流水.csv' # Replace this with the path to your CSV file | |
| beancount_file = 'pixiu.beancount' # Replace this with the path where you want to save your Beancount file | |
| df = read_data(csv_file) | |
| beancount_data = convert_to_beancount(df, args) | |
| save_to_beancount_file(beancount_data, beancount_file) | |
| def read_data(csv_file): | |
| # Read the CSV file as text | |
| with open(csv_file, 'r') as file: | |
| lines = file.readlines() | |
| # Find the line containing the "---------" text | |
| index_of_separator = next((i for i, line in enumerate(lines) if line.strip() == '------------------------'), None) | |
| if index_of_separator is not None: | |
| # Remove the trailing lines from the list | |
| lines = lines[:index_of_separator] | |
| # Join the remaining lines into a single string and read it with pandas | |
| csv_data = ''.join(lines) | |
| df = pd.read_csv(io.StringIO(csv_data)) | |
| else: | |
| # No separator line found, read the entire file as CSV | |
| df = pd.read_csv(csv_file) | |
| return df | |
| # run the main function | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment