Skip to content

Instantly share code, notes, and snippets.

@lixian
Last active May 17, 2023 12:26
Show Gist options
  • Save lixian/e3ab20080c9b34a89bb91bc12b413e53 to your computer and use it in GitHub Desktop.
Save lixian/e3ab20080c9b34a89bb91bc12b413e53 to your computer and use it in GitHub Desktop.
pixiu to beancount
import argparse
import io
import re
import pandas as pd
from pypinyin import pinyin, Style
use_pinyin = False
def n(*words):
return ":".join([get_account_word(word) for word in words])
def get_account_word(word):
trim = word.replace('+', '').replace(' ', '').replace('/', '').strip()
global use_pinyin
if use_pinyin:
trim = to_pinyin(trim)
return trim
# Function to convert Chinese characters to Pinyin
def to_pinyin(chinese_chars):
pinyin_chars = ''.join([i[0].capitalize() for i in pinyin(chinese_chars, style=Style.NORMAL)])
pinyin_chars = re.sub('[^a-zA-Z0-9-]', '', pinyin_chars)
return pinyin_chars
currency_map = {
'人民币': 'CNY',
'美元': 'USD',
'港币': 'HKD',
# Add other currency mappings here...
}
def convert_to_beancount(df, args):
beancount_data = []
opened_accounts = []
# Reverse the DataFrame to process transactions in ascending order by date
df = df.iloc[::-1]
for index, row in df.iterrows():
date = row['日期']
cat1 = row['交易分类']
cat2 = row['收支大类']
cat3 = row['交易类型']
if str(row['备注']) != "nan":
description = f"{cat1} {cat2} {cat3}: {row['备注']}"
else:
description = f"{cat1} {cat2} {cat3}"
currency = row['币种']
if currency in currency_map:
currency = currency_map[currency]
amount_in = row['流入金额']
amount_out = row['流出金额']
amount = max(amount_in, amount_out)
account_from, account_to = fetch_accounts(row, amount_in, amount_out, cat1, cat2, cat3)
# Open the accounts if they have not been opened yet
open_account(account_from, beancount_data, currency, date, opened_accounts)
open_account(account_to, beancount_data, currency, date, opened_accounts)
beancount_data.append(f"{date} * \"{description}\"")
beancount_data.append(f" {account_from} -{amount} {currency}")
beancount_data.append(f" {account_to} {amount} {currency}")
return '\n'.join(beancount_data)
def fetch_accounts(row, amount_in, amount_out, cat1, cat2, cat3):
# For transfer transactions
if '→' in row['资金账户']:
account_from, account_to = row['资金账户'].split('→')
account_from, account_to = n("Assets", account_from), n("Assets", account_to)
# For other transactions
else:
if row['资金账户'] == "无":
account = "Equity:None"
else:
account = n("Assets", row['资金账户'])
account_from, account_to = account, account
if cat1 == "日常收入":
if cat2 == "无":
cat2 = cat3
account_from, account_to = n("Income", cat1, cat2, cat3), account
elif cat1 == "日常支出":
if cat2 == "无":
cat2 = cat3
account_from, account_to = account, n("Expenses", cat1, cat2, cat3)
elif cat1 == "余额调整":
if cat2 == "系统收入":
if cat3 == "余额调整":
account_from, account_to = n("Equity", cat1, cat2, cat3), account
else:
account_from, account_to = n("Income", cat1, cat2, cat3), account
else: # expenses
if cat3 == "余额调整":
account_from, account_to = account, n("Equity", cat1, cat2, cat3)
else:
account_from, account_to = account, n("Expenses", cat1, cat2, cat3)
elif cat1 == "预付" or cat1 == "借出" or cat1 == "应收款":
account_from = account
match = re.search(r'「(.*?)」', cat3)
if match:
name = match.group(1)
account_to = n("Assets", name)
else:
print("error: no matched account")
elif cat1 == "借入":
account_to = account
match = re.search(r'「(.*?)」', cat3)
if match:
name = match.group(1)
account_from = n("Liabilities", name)
# account_from = n("Equity", name)
else:
print("error: no matched account")
elif cat1 == "证券买入":
account_from = account
account_to = n("Assets", "证券")
elif cat1 == "资产购入":
account_from = account
account_to = n("Assets", "房产")
# account_to = n("Equity", "房产")
elif cat1 == "贵金属买入":
account_from = account
account_to = n("Assets", "贵金属")
elif amount_in > 0:
account_from, account_to = 'Income:Miscellaneous', account
print(row)
elif amount_out > 0:
account_from, account_to = account, 'Expenses:Miscellaneous'
print(row)
else:
# print("no amount ", row)
pass
return account_from, account_to
def open_account(account, beancount_data, currency, date, opened_accounts):
if account not in opened_accounts:
beancount_data.append(f"{date} open {account}")
opened_accounts.append(account)
# Save the Beancount data to a file
def save_to_beancount_file(beancount_data, file_name):
with open(file_name, 'w') as f:
f.write("""option "title" "My Personal Ledger"
option "operating_currency" "CNY"
option "render_commas" "True"
2023-05-01 custom "fava-option" "language" "en"
2023-05-01 price USD 6.99 CNY
2023-05-01 price HKD 0.89 CNY
""")
f.write(beancount_data)
f.write("\n")
# main function to coordinate the process
def main():
# Create the parser
parser = argparse.ArgumentParser(description="A simple script")
# Add the arguments
parser.add_argument('-p', '--pinyin', action='store_true', help="Enable pinyin mode")
# Parse the arguments
args = parser.parse_args()
if args.pinyin:
global use_pinyin
use_pinyin = True
print("Pinyin mode is enabled")
csv_file = '/Users/loki/iCloud/Documents/貔貅/貔貅记账#所有交易流水.csv' # Replace this with the path to your CSV file
beancount_file = 'pixiu.beancount' # Replace this with the path where you want to save your Beancount file
df = read_data(csv_file)
beancount_data = convert_to_beancount(df, args)
save_to_beancount_file(beancount_data, beancount_file)
def read_data(csv_file):
# Read the CSV file as text
with open(csv_file, 'r') as file:
lines = file.readlines()
# Find the line containing the "---------" text
index_of_separator = next((i for i, line in enumerate(lines) if line.strip() == '------------------------'), None)
if index_of_separator is not None:
# Remove the trailing lines from the list
lines = lines[:index_of_separator]
# Join the remaining lines into a single string and read it with pandas
csv_data = ''.join(lines)
df = pd.read_csv(io.StringIO(csv_data))
else:
# No separator line found, read the entire file as CSV
df = pd.read_csv(csv_file)
return df
# run the main function
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment