Skip to content

Instantly share code, notes, and snippets.

@odytrice
Created June 19, 2024 14:11
Show Gist options
  • Save odytrice/08bd3474cf91b0bedf6bf8f35b655e8d to your computer and use it in GitHub Desktop.
Save odytrice/08bd3474cf91b0bedf6bf8f35b655e8d to your computer and use it in GitHub Desktop.

Revisions

  1. odytrice created this gist Jun 19, 2024.
    285 changes: 285 additions & 0 deletions BitcoinActor.fs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,285 @@
    module Nomad.Bitcoin.Indexer.BitcoinActors

    open System
    open System.Linq
    open Akka.FSharp
    open Microsoft.Extensions.DependencyInjection
    open Microsoft.Extensions.Logging
    open Nomad.Bitcoin.Core.Contracts
    open Nomad.Bitcoin.Core.Domain
    open Nomad.Bitcoin.Core.Domain.Errors
    open Nomad.Bitcoin.Core.Domain.Types
    open FsToolkit.ErrorHandling
    open Nomad.Bitcoin.Infrastructure.Database
    open Nomad.Bitcoin.Core.Domain.Helpers

    type Msg =
    | StartIndex

    type Context =
    abstract member Index: IWriteIndexRepository with get
    abstract member Node: INodeRepository with get
    abstract member Logger: ILogger<Context> with get
    abstract member Database: BitcoinDataContext with get


    module Context =
    let create (sp: IServiceProvider) =
    {
    new Context with
    member _.Index = sp.GetService<IWriteIndexRepository>()
    member _.Node = sp.GetService<INodeRepository>()
    member _.Database = sp.GetService<BitcoinDataContext>()
    member _.Logger = sp.GetService<ILogger<Context>>()
    }

    let fetchOutput (indexRepo: IWriteIndexRepository) (input: TransactionInput) =
    asyncResult {
    match input with
    | CoinBase _ -> return input, None
    | TxInput txInput ->
    let! output = indexRepo.GetOutputByIndex(txInput.TransactionId, txInput.OutputIndex)
    return input, Some output
    }

    let indexTransaction (context: Context) (addressMap: Map<string, int64>) (blockHash: BlockHash, blockTime: DateTimeOffset) (transaction: TransactionDetails) =
    asyncResult {
    // Create All Input Indices
    let inputIndices : InputIndex list = [
    for input in transaction.Inputs do
    match input with
    | CoinBase coinBaseDetails ->
    yield {
    OutputId = None
    InputId = 0L
    TransactionId = transaction.Id
    Sequence = coinBaseDetails.Sequence
    IsCoinBase = true
    OutputTransactionId = None
    OutputTransactionIndex = None
    }
    | TxInput txInputDetails ->
    yield {
    OutputTransactionIndex = txInputDetails.OutputIndex |> Some
    OutputTransactionId = txInputDetails.TransactionId |> Some
    OutputId = None
    InputId = 0L
    TransactionId = transaction.Id
    Sequence = txInputDetails.Sequence
    IsCoinBase = false
    }
    ]

    // Create Output Indices
    let outputIndices: OutputIndex list = [
    for output in transaction.Outputs do
    let addresses = output.ScriptPubKey |> Helpers.parseAddresses
    yield {
    OutputId = 0L
    TransactionId = transaction.Hash
    Index = output.Index
    Value = decimal output.Value
    ScriptPubKey = output.ScriptPubKey.Asm
    Addresses = [
    for address in addresses do
    // Locate Address From Address Map, so we can get the ID of the Address
    let idOption = addressMap |> Map.tryFind address
    if idOption.IsSome then
    yield {
    Id = idOption.Value
    Value = address
    Balance = 0M<Bitcoins>
    TransactionCount = 0
    Date = blockTime
    }
    ]
    }
    ]

    // Construct A Single Transaction
    let indexTransaction: TransactionIndex = {
    TransactionId = transaction.Id
    BlockHash = blockHash
    Inputs = inputIndices
    Outputs = outputIndices
    }

    do! context.Index.AddTransaction indexTransaction
    }

    let indexTransactions (context: Context) (addressMap: Map<string, int64>) (blockDetails: BlockDetails) =
    asyncResult {
    let logger = context.Logger
    logger.LogInformation("Saving Block Transactions")

    // Get Already Indexed Transactions By Block
    let! indexedTransactionIds =
    blockDetails.Hash
    |> context.Index.GetTransactionsByBlock
    |> AsyncResult.map(List.map(fun t -> t.TransactionId))

    // Filter Pending Transactions
    let pendingTransactions =
    blockDetails.Transactions
    |> List.filter(fun t -> not <| indexedTransactionIds.Contains(t.Id))

    for transaction in pendingTransactions do
    let blockData = blockDetails.Hash, blockDetails.Time
    do! indexTransaction context addressMap blockData transaction
    }

    /// Creates a map of addresses
    let createAddressMap (index: IWriteIndexRepository) (blockTime: DateTimeOffset) (addresses: string list): Async<Result<Map<string, int64>,AppError>> =
    asyncResult {
    // Create a Set to remove Duplicates
    let addressSet = Set addresses

    // Fetch Existing Addresses from Index
    let! existingAddresses =
    addressSet
    |> List.ofSeq
    |> index.GetAddresses

    // Create Address set using existing Addresses
    let existingAddressSet =
    existingAddresses
    |> List.map _.Value
    |> Set

    // Extract new addresses and save them to the index
    let newAddressSet = addressSet - existingAddressSet
    let! newAddressRecords =
    newAddressSet
    |> Set.map (fun a -> {
    AddressIndex.Id = 0L
    AddressIndex.Value = a
    AddressIndex.Date = blockTime
    Balance = 0M<Bitcoins>
    TransactionCount = 0
    })
    |> List.ofSeq
    |> index.SaveAddresses

    // Return existing and new addresses Combined with their Ids
    return
    newAddressRecords @ existingAddresses
    |> List.map (fun address -> address.Value, address.Id)
    |> Map
    }

    let indexBlock (sp: IServiceProvider) (height: int) =
    asyncResult {
    use scope = sp.CreateScope()
    let context = Context.create scope.ServiceProvider
    let logger = context.Logger

    // Fetch Block Details
    let! blockHash = context.Node.GetBlockHash height

    logger.LogInformation("Fetching Block {Hash}", blockHash)
    let! blockDetails = context.Node.GetBlockDetails blockHash

    logger.LogInformation("Successfully Fetched Block")

    // Extract All Addresses from the Block
    let allAddresses =
    blockDetails.Transactions
    |> List.collect _.Outputs
    |> List.map _.ScriptPubKey
    |> List.collect Helpers.parseAddresses

    // Create AddressMap or "AddressBook"
    let! addressMap =
    allAddresses
    |> createAddressMap context.Index blockDetails.Time

    logger.LogInformation("Saving Block")
    let! existing = context.Index.GetBlock(blockDetails.Hash)
    match existing with
    | None -> do! context.Index.AddBlock(blockDetails)
    | Some block -> logger.LogWarning("Block {Height} has already been indexed", block.Height)

    do! indexTransactions context addressMap blockDetails

    logger.LogInformation("Marking Block {Height} as Completed", blockDetails.Height)
    do! context.Index.SetBlockStatus(blockDetails.Hash, IndexStatus.Complete);
    }

    let indexPendingBlocks (sp: IServiceProvider) (context: Context) =
    asyncResult {
    let! pendingBlocks = context.Index.GetPendingBlocks()
    if pendingBlocks.Length > 0 then
    context.Logger.LogInformation("Found {count} Pending Blocks", pendingBlocks.Length)

    let! missingBlocks = context.Index.GetMissingBlocks()
    if missingBlocks.Length > 0 then
    context.Logger.LogInformation("Found {count} Missing Blocks", missingBlocks.Length)

    return!
    missingBlocks @ pendingBlocks
    |> List.map(indexBlock sp)
    |> List.sequenceAsyncResultM
    }

    let rootActor (sp: IServiceProvider) (mailbox:Actor<Msg>) =

    let indexChain (context: Context) =
    asyncResult {
    let! chainInfo = context.Node.GetChainInfo()
    let nodeBlockHeight = chainInfo.Blocks
    let! indexBlockHeight = context.Index.GetBlockChainHeight()

    let! pendingBlocks = indexPendingBlocks sp context

    if nodeBlockHeight > indexBlockHeight then
    context.Logger.LogInformation("Current Block Height {nodeBlockHeight}\n", nodeBlockHeight)
    let batches = [ indexBlockHeight .. nodeBlockHeight ] |> List.batchesOf 10
    for batch in batches do
    let startIndex , endIndex = batch.Head, batch |> List.last
    context.Logger.LogInformation("Processing Batch {Start} .. {End}", startIndex, endIndex)

    // Run Block Index in Parallel
    let! results =
    batch
    |> List.map(indexBlock sp)
    |> Async.Parallel

    // If any fails, Collect the Errors into an Aggregate Error
    do! results
    |> List.ofArray
    |> List.sequenceResultA
    |> Result.mapError AggregateError
    |> Result.ignore

    context.Logger.LogInformation("Consolidating Detached Inputs")
    do! context.Index.ConsolidateInputs() |> AsyncResult.ignore

    let percentageCompleted = float endIndex / float nodeBlockHeight * 100.0
    context.Logger.LogInformation("Processing Complete for Batch. Overall Index is {percentageComplete:N4}% Complete\n\n", percentageCompleted)

    return nodeBlockHeight - indexBlockHeight + pendingBlocks.Length
    else
    return pendingBlocks.Length
    }

    let rec loop () =
    actor {
    use scope = sp.CreateScope()
    let! msg = mailbox.Receive()

    let context = Context.create scope.ServiceProvider

    match msg with
    | StartIndex ->
    let blocksIndexed =
    indexChain context
    |> Async.RunSynchronously
    match blocksIndexed with
    | Ok 0 -> context.Logger.LogInformation "No Blocks were processed, Index is up to date"
    | Ok blocks -> context.Logger.LogInformation("{blocks} Blocks were processed", blocks)
    | Error appError ->
    let msg = describeError appError
    context.Logger.LogError ("An Error Occurred indexing Blocks {msg}", msg)
    return! loop()
    }
    loop()