Created
June 19, 2024 14:11
-
-
Save odytrice/08bd3474cf91b0bedf6bf8f35b655e8d to your computer and use it in GitHub Desktop.
Revisions
-
odytrice created this gist
Jun 19, 2024 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,285 @@ module Nomad.Bitcoin.Indexer.BitcoinActors open System open System.Linq open Akka.FSharp open Microsoft.Extensions.DependencyInjection open Microsoft.Extensions.Logging open Nomad.Bitcoin.Core.Contracts open Nomad.Bitcoin.Core.Domain open Nomad.Bitcoin.Core.Domain.Errors open Nomad.Bitcoin.Core.Domain.Types open FsToolkit.ErrorHandling open Nomad.Bitcoin.Infrastructure.Database open Nomad.Bitcoin.Core.Domain.Helpers type Msg = | StartIndex type Context = abstract member Index: IWriteIndexRepository with get abstract member Node: INodeRepository with get abstract member Logger: ILogger<Context> with get abstract member Database: BitcoinDataContext with get module Context = let create (sp: IServiceProvider) = { new Context with member _.Index = sp.GetService<IWriteIndexRepository>() member _.Node = sp.GetService<INodeRepository>() member _.Database = sp.GetService<BitcoinDataContext>() member _.Logger = sp.GetService<ILogger<Context>>() } let fetchOutput (indexRepo: IWriteIndexRepository) (input: TransactionInput) = asyncResult { match input with | CoinBase _ -> return input, None | TxInput txInput -> let! output = indexRepo.GetOutputByIndex(txInput.TransactionId, txInput.OutputIndex) return input, Some output } let indexTransaction (context: Context) (addressMap: Map<string, int64>) (blockHash: BlockHash, blockTime: DateTimeOffset) (transaction: TransactionDetails) = asyncResult { // Create All Input Indices let inputIndices : InputIndex list = [ for input in transaction.Inputs do match input with | CoinBase coinBaseDetails -> yield { OutputId = None InputId = 0L TransactionId = transaction.Id Sequence = coinBaseDetails.Sequence IsCoinBase = true OutputTransactionId = None OutputTransactionIndex = None } | TxInput txInputDetails -> yield { OutputTransactionIndex = txInputDetails.OutputIndex |> Some OutputTransactionId = txInputDetails.TransactionId |> Some OutputId = None InputId = 0L TransactionId = transaction.Id Sequence = txInputDetails.Sequence IsCoinBase = false } ] // Create Output Indices let outputIndices: OutputIndex list = [ for output in transaction.Outputs do let addresses = output.ScriptPubKey |> Helpers.parseAddresses yield { OutputId = 0L TransactionId = transaction.Hash Index = output.Index Value = decimal output.Value ScriptPubKey = output.ScriptPubKey.Asm Addresses = [ for address in addresses do // Locate Address From Address Map, so we can get the ID of the Address let idOption = addressMap |> Map.tryFind address if idOption.IsSome then yield { Id = idOption.Value Value = address Balance = 0M<Bitcoins> TransactionCount = 0 Date = blockTime } ] } ] // Construct A Single Transaction let indexTransaction: TransactionIndex = { TransactionId = transaction.Id BlockHash = blockHash Inputs = inputIndices Outputs = outputIndices } do! context.Index.AddTransaction indexTransaction } let indexTransactions (context: Context) (addressMap: Map<string, int64>) (blockDetails: BlockDetails) = asyncResult { let logger = context.Logger logger.LogInformation("Saving Block Transactions") // Get Already Indexed Transactions By Block let! indexedTransactionIds = blockDetails.Hash |> context.Index.GetTransactionsByBlock |> AsyncResult.map(List.map(fun t -> t.TransactionId)) // Filter Pending Transactions let pendingTransactions = blockDetails.Transactions |> List.filter(fun t -> not <| indexedTransactionIds.Contains(t.Id)) for transaction in pendingTransactions do let blockData = blockDetails.Hash, blockDetails.Time do! indexTransaction context addressMap blockData transaction } /// Creates a map of addresses let createAddressMap (index: IWriteIndexRepository) (blockTime: DateTimeOffset) (addresses: string list): Async<Result<Map<string, int64>,AppError>> = asyncResult { // Create a Set to remove Duplicates let addressSet = Set addresses // Fetch Existing Addresses from Index let! existingAddresses = addressSet |> List.ofSeq |> index.GetAddresses // Create Address set using existing Addresses let existingAddressSet = existingAddresses |> List.map _.Value |> Set // Extract new addresses and save them to the index let newAddressSet = addressSet - existingAddressSet let! newAddressRecords = newAddressSet |> Set.map (fun a -> { AddressIndex.Id = 0L AddressIndex.Value = a AddressIndex.Date = blockTime Balance = 0M<Bitcoins> TransactionCount = 0 }) |> List.ofSeq |> index.SaveAddresses // Return existing and new addresses Combined with their Ids return newAddressRecords @ existingAddresses |> List.map (fun address -> address.Value, address.Id) |> Map } let indexBlock (sp: IServiceProvider) (height: int) = asyncResult { use scope = sp.CreateScope() let context = Context.create scope.ServiceProvider let logger = context.Logger // Fetch Block Details let! blockHash = context.Node.GetBlockHash height logger.LogInformation("Fetching Block {Hash}", blockHash) let! blockDetails = context.Node.GetBlockDetails blockHash logger.LogInformation("Successfully Fetched Block") // Extract All Addresses from the Block let allAddresses = blockDetails.Transactions |> List.collect _.Outputs |> List.map _.ScriptPubKey |> List.collect Helpers.parseAddresses // Create AddressMap or "AddressBook" let! addressMap = allAddresses |> createAddressMap context.Index blockDetails.Time logger.LogInformation("Saving Block") let! existing = context.Index.GetBlock(blockDetails.Hash) match existing with | None -> do! context.Index.AddBlock(blockDetails) | Some block -> logger.LogWarning("Block {Height} has already been indexed", block.Height) do! indexTransactions context addressMap blockDetails logger.LogInformation("Marking Block {Height} as Completed", blockDetails.Height) do! context.Index.SetBlockStatus(blockDetails.Hash, IndexStatus.Complete); } let indexPendingBlocks (sp: IServiceProvider) (context: Context) = asyncResult { let! pendingBlocks = context.Index.GetPendingBlocks() if pendingBlocks.Length > 0 then context.Logger.LogInformation("Found {count} Pending Blocks", pendingBlocks.Length) let! missingBlocks = context.Index.GetMissingBlocks() if missingBlocks.Length > 0 then context.Logger.LogInformation("Found {count} Missing Blocks", missingBlocks.Length) return! missingBlocks @ pendingBlocks |> List.map(indexBlock sp) |> List.sequenceAsyncResultM } let rootActor (sp: IServiceProvider) (mailbox:Actor<Msg>) = let indexChain (context: Context) = asyncResult { let! chainInfo = context.Node.GetChainInfo() let nodeBlockHeight = chainInfo.Blocks let! indexBlockHeight = context.Index.GetBlockChainHeight() let! pendingBlocks = indexPendingBlocks sp context if nodeBlockHeight > indexBlockHeight then context.Logger.LogInformation("Current Block Height {nodeBlockHeight}\n", nodeBlockHeight) let batches = [ indexBlockHeight .. nodeBlockHeight ] |> List.batchesOf 10 for batch in batches do let startIndex , endIndex = batch.Head, batch |> List.last context.Logger.LogInformation("Processing Batch {Start} .. {End}", startIndex, endIndex) // Run Block Index in Parallel let! results = batch |> List.map(indexBlock sp) |> Async.Parallel // If any fails, Collect the Errors into an Aggregate Error do! results |> List.ofArray |> List.sequenceResultA |> Result.mapError AggregateError |> Result.ignore context.Logger.LogInformation("Consolidating Detached Inputs") do! context.Index.ConsolidateInputs() |> AsyncResult.ignore let percentageCompleted = float endIndex / float nodeBlockHeight * 100.0 context.Logger.LogInformation("Processing Complete for Batch. Overall Index is {percentageComplete:N4}% Complete\n\n", percentageCompleted) return nodeBlockHeight - indexBlockHeight + pendingBlocks.Length else return pendingBlocks.Length } let rec loop () = actor { use scope = sp.CreateScope() let! msg = mailbox.Receive() let context = Context.create scope.ServiceProvider match msg with | StartIndex -> let blocksIndexed = indexChain context |> Async.RunSynchronously match blocksIndexed with | Ok 0 -> context.Logger.LogInformation "No Blocks were processed, Index is up to date" | Ok blocks -> context.Logger.LogInformation("{blocks} Blocks were processed", blocks) | Error appError -> let msg = describeError appError context.Logger.LogError ("An Error Occurred indexing Blocks {msg}", msg) return! loop() } loop()