Skip to content

Instantly share code, notes, and snippets.

@bartelink
Last active May 7, 2017 13:42
Show Gist options
  • Save bartelink/ab6bb881b77ba62bcfea to your computer and use it in GitHub Desktop.
Save bartelink/ab6bb881b77ba62bcfea to your computer and use it in GitHub Desktop.

Revisions

  1. bartelink renamed this gist Aug 22, 2014. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. bartelink created this gist Aug 21, 2014.
    40 changes: 40 additions & 0 deletions InMemoryStore.fs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,40 @@
    module Scenarios

    open FunUno.UnoGame // Commands, replay, handle
    open FunUno.UnoGame.Events // Digit

    open FunDomain // CommandHandler
    open FunDomain.Persistence.NEventStore.NesGateway // createInMemory, StreamId

    open Xunit

    let fullGameActions gameId = [
    StartGame { GameId=gameId; PlayerCount=4; TopCard=Digit(3, Red) }
    PlayCard { GameId=gameId; Player=0; Card=Digit(3, Blue) }
    PlayCard { GameId=gameId; Player=1; Card=Digit(8, Blue) }
    PlayCard { GameId=gameId; Player=2; Card=Digit(8, Yellow) }
    PlayCard { GameId=gameId; Player=3; Card=Digit(4, Yellow) }
    PlayCard { GameId=gameId; Player=0; Card=Digit(4, Green) } ]

    let streamId gameId = {Bucket=None; StreamId=gameId |> string}

    let [<Fact>] ``Can run a full round using NEventStore's InMemoryPersistence Protobuf`` () =
    let domainHandler = CommandHandler.create replay handle

    let store = createInMemoryProtobuf()
    let persistingHandler = domainHandler store.read store.append

    let gameId = 42
    let stream = streamId gameId

    for action in fullGameActions gameId do
    printfn "Processing %A against Stream %A" action stream
    action |> persistingHandler stream

    let [<Fact>] warmup () =
    ProtoBufAdapter.registerSerializableDu<FunUno.UnoGame.Event> ()
    ProtoBufAdapter.registerSerializableDu<FunUno.UnoGame.Events.Card> ()
    ProtoBufAdapter.registerSerializableDu<FunUno.UnoGame.Events.Color> ()

    ``Can run a full round using NEventStore's InMemoryPersistence Protobuf``()
    ``Can run a full round using NEventStore's InMemoryPersistence`` ()
    126 changes: 126 additions & 0 deletions NesGateway.fs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,126 @@
    module FunDomain.Persistence.NEventStore.NesGateway

    open FunDomain.Persistence.Serialization

    open NEventStore
    open NEventStore.Persistence
    open NEventStore.Persistence.Sql.SqlDialects

    open Microsoft.FSharp.Reflection
    open System
    open System.Collections.Generic

    /// Opaque token yielded by Streamer.read and consumed by Streamer.append
    type Token = { CommitSequence : int; StreamRevision : int}

    /// Identifier of a stream in NEventStore
    type StreamId = { Bucket: string option; StreamId: string}

    type ProtobufSerializer() =
    interface NEventStore.Serialization.ISerialize with
    member x.Deserialize<'T> input =
    ProtoBuf.Serializer.Deserialize<'T>(input)

    member x.Serialize<'T>(output, graph) =
    ProtoBuf.Serializer.Serialize<'T>(output, graph)

    module ProtoBufAdapter =
    open ProtoBuf
    open ProtoBuf.Meta
    let deserializeUnion<'a> (data:byte[]) =
    Serializer.Deserialize<'a>(new IO.MemoryStream(data))

    let serializeUnion (o:'a) =
    use stream = new IO.MemoryStream()
    Serializer.Serialize<'a>(stream, o)
    stream.ToArray()

    let registerSerializableDuInModel<'TMessage> (model:RuntimeTypeModel) =
    let baseType = model.[typeof<'TMessage>]
    for case in typeof<'TMessage> |> FSharpType.GetUnionCases do
    let caseType = case.Name |> case.DeclaringType.GetNestedType
    baseType.AddSubType(1000 + case.Tag, caseType) |> ignore
    let caseTypeModel = model.[caseType]
    caseTypeModel.Add("item").UseConstructor <- false
    baseType.CompileInPlace()

    let registerSerializableDu<'TMessage> () = registerSerializableDuInModel<'TMessage> RuntimeTypeModel.Default


    /// Wrapper yielded by create* functions with create/append functions matching FunDomain.CommandHandler requirements
    type StreamerProtobuf private (inner') =
    // Hoop jumping a la C++ pimpl pattern - if we don't do this, we're foisting an NEventStore package reference on all downstream users
    let inner : IPersistStreams = unbox inner'
    let defaultBucket bucketId = defaultArg bucketId "default"
    let load {Bucket=bucketId; StreamId=streamId} minRevision maxRevision =
    inner.GetFrom(bucketId |> defaultBucket, streamId, minRevision, maxRevision)
    let commit = inner.Commit >> ignore
    let readStream streamId startIndex count =
    let minRevision,maxRevision = startIndex,startIndex+count-1
    async {
    let commits =
    load streamId minRevision maxRevision
    |> Array.ofSeq
    let events =
    commits
    |> Seq.collect (fun ev -> ev.Events)
    |> Seq.map (fun em -> em.Body |> unbox |> ProtoBufAdapter.deserializeUnion)
    |> List.ofSeq
    let tokenOption =
    if commits.Length = 0 then
    None
    else
    let lastCommit = commits |> Seq.last
    Some {CommitSequence=lastCommit.CommitSequence; StreamRevision=lastCommit.StreamRevision}

    return events, tokenOption, None }

    let appendToStream {Bucket=bucketId; StreamId=streamId} streamMeta token events =
    let commitId,commitStamp,commitHeaders = streamMeta
    async {
    let eventMessages =
    events |> Seq.map (fun event ->
    let body = event |> ProtoBufAdapter.serializeUnion |> box
    EventMessage(Body=body))
    let updatedStreamRevision=token |> Option.map (fun token -> token.StreamRevision+1)
    let updatedCommitSequence=token |> Option.map (fun token -> token.CommitSequence+1)
    let attempt =
    CommitAttempt(
    bucketId |> defaultBucket, streamId,
    updatedStreamRevision |> defaultArg <| 1,
    commitId,
    updatedCommitSequence |> defaultArg <| 1,
    commitStamp,
    commitHeaders,
    eventMessages)
    commit attempt }

    static member internal wrap persister = StreamerProtobuf( box persister)

    member this.read stream =
    let events,version,_ =
    readStream stream 0 Int32.MaxValue
    |> Async.RunSynchronously
    version,events

    member this.append stream token events =
    let commitMetadata() =
    let commitId = Guid.NewGuid()
    let commitDateTime = DateTime.UtcNow
    let commitHeaders = null
    commitId,commitDateTime,commitHeaders
    let metadata = commitMetadata()
    appendToStream stream metadata token events
    |> Async.RunSynchronously

    let createFromStoreProtobuf (inner:IStoreEvents) =
    inner.Advanced |> StreamerProtobuf.wrap

    let createInMemoryProtobuf () =
    let serializer = ProtobufSerializer()
    Wireup.Init()
    .LogToOutputWindow()
    .UsingInMemoryPersistence()
    .UsingCustomSerialization( serializer)
    .Build()
    |> createFromStoreProtobuf
    88 changes: 88 additions & 0 deletions ProtobufNetSerialization.fs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,88 @@
    module FunDomain.Tests.ProtobufNetSerialization

    open ProtoBuf
    open ProtoBuf.Meta

    open Swensen.Unquote
    open Xunit

    open System.IO
    open Microsoft.FSharp.Reflection

    [<ProtoContract; CLIMutable>]
    type MessageA = {
    [<ProtoMember(1)>] X: string;
    [<ProtoMember(2)>] Y: int option;
    }

    [<ProtoContract>]
    [<CLIMutable>]
    type MessageB = {
    [<ProtoMember(1)>] A: string;
    [<ProtoMember(2)>] B: string;
    }

    [<ProtoContract>]
    type Message =
    | MessageA of MessageA
    | MessageB of MessageB

    let serialize msg =
    use ms = new MemoryStream()
    Serializer.SerializeWithLengthPrefix(ms, msg, PrefixStyle.Fixed32)
    ms.ToArray()

    let deserialize<'TMessage> bytes =
    use ms = new MemoryStream(buffer=bytes)
    Serializer.DeserializeWithLengthPrefix<'TMessage>(ms, PrefixStyle.Fixed32)

    let registerSerializableDuInModel<'TMessage> (model:RuntimeTypeModel) =
    let baseType = model.[typeof<'TMessage>]
    for case in typeof<'TMessage> |> FSharpType.GetUnionCases do
    let caseType = case.Name |> case.DeclaringType.GetNestedType
    baseType.AddSubType(1000 + case.Tag, caseType) |> ignore
    let caseTypeModel = model.[caseType]
    caseTypeModel.Add("item").UseConstructor <- false
    baseType.CompileInPlace()

    let registerSerializableDu<'TMessage> () = registerSerializableDuInModel<'TMessage> RuntimeTypeModel.Default

    registerSerializableDu<Message> ()

    let [<Fact>] ``MessageA roundtrips with null`` () =
    let msg = {X=null; Y=None}
    let result = serialize msg
    test <@ msg = deserialize result @>

    let [<Fact>] ``MessageA roundtrips with Empty`` () =
    let msg = {X=""; Y=None}
    let result = serialize msg
    test <@ msg = deserialize result @>

    let [<Fact>] ``MessageA roundtrips with Some`` () =
    let msg = {X="foo"; Y=Some 32}
    let result = serialize msg
    test <@ msg = deserialize result @>

    let [<Fact>] ``MessageA roundtrips with None`` () =
    let msg = {X="foo"; Y=None}
    let result = serialize msg
    test <@ msg = deserialize result @>

    let [<Fact>] ``MessageB roundtrips`` () =
    let msg = {A="bar"; B="baz"}
    let result = serialize msg
    test <@ msg = deserialize result @>

    let [<Fact>] ``roundtrip pair``() =
    let msg1 = MessageA {X="foo"; Y=Some 32}
    let msg1' = msg1 |> serialize |> deserialize
    test <@ msg1' = msg1 @>

    let msg2 = MessageB {A="bar"; B="baz"}
    let msg2' = msg2 |> serialize |> deserialize
    test <@ msg2' = msg2 @>

    let [<Fact>] many() =
    for _ in 1..1000 do
    ``roundtrip pair``()
    88 changes: 88 additions & 0 deletions ProtobufNetSerializationFacts.fs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,88 @@
    module FunDomain.Tests.ProtobufNetSerializationFacts

    open ProtoBuf
    open ProtoBuf.Meta

    open Swensen.Unquote
    open Xunit

    open System.IO
    open Microsoft.FSharp.Reflection

    [<ProtoContract; CLIMutable>]
    type MessageA = {
    [<ProtoMember(1)>] X: string;
    [<ProtoMember(2)>] Y: int option;
    }

    [<ProtoContract>]
    [<CLIMutable>]
    type MessageB = {
    [<ProtoMember(1)>] A: string;
    [<ProtoMember(2)>] B: string;
    }

    [<ProtoContract>]
    type Message =
    | MessageA of MessageA
    | MessageB of MessageB

    let serialize msg =
    use ms = new MemoryStream()
    Serializer.SerializeWithLengthPrefix(ms, msg, PrefixStyle.Fixed32)
    ms.ToArray()

    let deserialize<'TMessage> bytes =
    use ms = new MemoryStream(buffer=bytes)
    Serializer.DeserializeWithLengthPrefix<'TMessage>(ms, PrefixStyle.Fixed32)

    let registerSerializableDuInModel<'TMessage> (model:RuntimeTypeModel) =
    let baseType = model.[typeof<'TMessage>]
    for case in typeof<'TMessage> |> FSharpType.GetUnionCases do
    let caseType = case.Name |> case.DeclaringType.GetNestedType
    baseType.AddSubType(1000 + case.Tag, caseType) |> ignore
    let caseTypeModel = model.[caseType]
    caseTypeModel.Add("item").UseConstructor <- false
    baseType.CompileInPlace()

    let registerSerializableDu<'TMessage> () = registerSerializableDuInModel<'TMessage> RuntimeTypeModel.Default

    registerSerializableDu<Message> ()

    let [<Fact>] ``MessageA roundtrips with null`` () =
    let msg = {X=null; Y=None}
    let result = serialize msg
    test <@ msg = deserialize result @>

    let [<Fact>] ``MessageA roundtrips with Empty`` () =
    let msg = {X=""; Y=None}
    let result = serialize msg
    test <@ msg = deserialize result @>

    let [<Fact>] ``MessageA roundtrips with Some`` () =
    let msg = {X="foo"; Y=Some 32}
    let result = serialize msg
    test <@ msg = deserialize result @>

    let [<Fact>] ``MessageA roundtrips with None`` () =
    let msg = {X="foo"; Y=None}
    let result = serialize msg
    test <@ msg = deserialize result @>

    let [<Fact>] ``MessageB roundtrips`` () =
    let msg = {A="bar"; B="baz"}
    let result = serialize msg
    test <@ msg = deserialize result @>

    let [<Fact>] ``roundtrip pair``() =
    let msg1 = MessageA {X="foo"; Y=Some 32}
    let msg1' = msg1 |> serialize |> deserialize
    test <@ msg1' = msg1 @>

    let msg2 = MessageB {A="bar"; B="baz"}
    let msg2' = msg2 |> serialize |> deserialize
    test <@ msg2' = msg2 @>

    let [<Fact>] many() =
    for _ in 1..1000 do
    ``roundtrip pair``()
    30 changes: 30 additions & 0 deletions UnoGame.fs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,30 @@
    namespace FunUno

    open ProtoBuf

    module UnoGame =
    module Events =
    type [<ProtoContract; CLIMutable>] GameStartedEvent = {
    [<ProtoMember(1)>] GameId: int;
    [<ProtoMember(2)>] PlayerCount:int;
    [<ProtoMember(3)>] TopCard: Card}

    and [<ProtoContract; CLIMutable>] CardPlayedEvent = {
    [<ProtoMember(1)>] GameId: int;
    [<ProtoMember(2)>] Player:int;
    [<ProtoMember(3)>] Card: Card}

    and [<ProtoContract>] Color =
    | Red
    | Green
    | Blue
    | Yellow

    and [<ProtoContract>] Card =
    | Digit of DigitCard
    | Kickback of KickbackCard

    [<ProtoContract>]
    type Event =
    | GameStarted of Events.GameStartedEvent
    | CardPlayed of Events.CardPlayedEvent