using System.Threading.Tasks; using System.IO.MemoryMappedFiles; using System.Runtime.InteropServices; using System.Security.Cryptography; using System.Runtime.Serialization; using System.Runtime.Serialization.Formatters.Binary; async Task Main() { using (var channel = DemoServer()) await DemoClient(); } IDisposable DemoServer() => new FastChannel ("test", true, GetType().Module); async Task DemoClient() { using (var channel = new FastChannel ("test", false, GetType().Module)) { Proxy fooProxy = await channel.Activate(); int remoteProcessID = await fooProxy.Eval (foo => foo.ProcessID); remoteProcessID.Dump ("Remote process ID"); int sum = await fooProxy.Eval (foo => foo.Add (2, 2)); sum.Dump ("Sum"); int sum2 = await fooProxy.Eval (foo => foo.AddAsync (3, 3)); sum2.Dump ("Sum"); #region Marshaling Proxy barProxy = await channel.Activate(); Proxy fooProxy2 = await barProxy.Eval (b => b.GetFoo()); (await fooProxy2.Eval (foo => foo.Add (2, 2))).Dump(); await barProxy.Run (b => b.PrintFoo (fooProxy2)); await barProxy.Run (b => b.PrintFoo (new Foo())); #endregion } } class Foo { public int ProcessID => Process.GetCurrentProcess().Id; public int Add (int x, int y) => x + y; public async Task AddAsync (int x, int y) { await Task.Delay(1000); return x + y; } } #region Marshaling class Bar { public Proxy GetFoo() => new Foo(); public async Task PrintFoo (Proxy foo) { int fooID = await foo.Eval (f => f.ProcessID); $"Foo's process ID is {fooID}".Dump(); } } #endregion #region FastChannel class FastChannel : SafeDisposable { static int _lastMessageID, _lastObjectID; public readonly byte DomainAddress; readonly OutPipe _outPipe; readonly InPipe _inPipe; readonly Module _module; // The module for which serialization is optimized. readonly object _proxiesLock = new object (); readonly Dictionary _proxiesByID = new Dictionary (); readonly Dictionary _proxiesByObject = new Dictionary (); readonly Dictionary> _pendingReplies = new Dictionary> (); int _messagesReceived; public int MessagesReceived { get { return _messagesReceived; } } enum MessageType : byte { Activation, Deactivation, MethodCall, ReturnValue, ReturnException } public FastChannel (string name, bool isOwner, Module module) { _module = module; // Types belonging to this module will serialize faster DomainAddress = (byte)(isOwner ? 1 : 2); _outPipe = new OutPipe (name + (isOwner ? ".A" : ".B"), isOwner); _inPipe = new InPipe (name + (isOwner ? ".B" : ".A"), isOwner, OnMessageReceived); } protected override void DisposeCore() { base.DisposeCore(); lock (_proxiesLock) { _proxiesByID.Clear (); _proxiesByObject.Clear (); } _inPipe.Dispose(); _outPipe.Dispose(); } IProxy FindProxy (int objectID) { IProxy proxy; lock (_proxiesLock) _proxiesByID.TryGetValue (objectID, out proxy); return proxy; } IProxy RegisterLocalProxy (IProxy proxy) { lock (_proxiesLock) { // Avoid multiple identities for the same object: IProxy existingProxy; bool alreadyThere = _proxiesByObject.TryGetValue (proxy.LocalInstanceUntyped, out existingProxy); if (alreadyThere) return existingProxy; int objectID = Interlocked.Increment (ref _lastObjectID); proxy.RegisterLocal (this, objectID, () => UnregisterLocalProxy (proxy, objectID)); _proxiesByID [objectID] = proxy; _proxiesByObject [proxy] = proxy; return proxy; } } void UnregisterLocalProxy (IProxy proxy, int objectID) { lock (_proxiesLock) { _proxiesByID.Remove (objectID); _proxiesByObject.Remove (proxy.LocalInstanceUntyped); } } /// Instantiates an object remotely. To release it, you can either call Disconnect on the proxy returned /// or wait for its finalizer to do the same.s public Task> Activate() where TRemote : class { lock (DisposeLock) { AssertSafe(); int messageNumber = Interlocked.Increment (ref _lastMessageID); var ms = new MemoryStream (); var writer = new BinaryWriter (ms); writer.Write ((byte)MessageType.Activation); writer.Write (messageNumber); SerializeType (writer, typeof(TRemote)); writer.Flush (); var task = GetResponseFuture> (messageNumber); _outPipe.Write (ms.ToArray ()); return task; } } internal void InternalDeactivate (int objectID) { var ms = new MemoryStream (); var writer = new BinaryWriter (ms); writer.Write ((byte)MessageType.Deactivation); writer.Write (objectID); writer.Flush (); lock (DisposeLock) { if (IsDisposed) return; _outPipe.Write (ms.ToArray ()); } } internal Task SendMethodCall (Expression expressionBody, int objectID, bool awaitRemoteTask) { lock (DisposeLock) { AssertSafe(); int messageNumber = Interlocked.Increment (ref _lastMessageID); byte [] payload = SerializeMethodCall (expressionBody, messageNumber, objectID, awaitRemoteTask); var task = GetResponseFuture (messageNumber); _outPipe.Write (payload); return task; } } Task GetResponseFuture (int messageNumber) { var tcs = new TaskCompletionSource (); lock (_pendingReplies) _pendingReplies.Add (messageNumber, (msgType, value) => { if (msgType == MessageType.ReturnValue) tcs.SetResult ((T)value); else { var ex = (Exception)value; MethodInfo preserveStackTrace = typeof(Exception).GetMethod ("InternalPreserveStackTrace", BindingFlags.Instance | BindingFlags.NonPublic); if (preserveStackTrace != null) preserveStackTrace.Invoke (ex, null); tcs.SetException (ex); } }); return tcs.Task; } void OnMessageReceived (byte[] data) { Interlocked.Increment (ref _messagesReceived); lock (DisposeLock) { if (IsDisposed) return; var ms = new MemoryStream (data); var reader = new BinaryReader (ms); var messageType = (MessageType)reader.ReadByte(); if (messageType == MessageType.ReturnValue || messageType == MessageType.ReturnException) ReceiveReply (messageType, reader); else if (messageType == MessageType.MethodCall) ReceiveMethodCall (reader); else if (messageType == MessageType.Activation) ReceiveActivation (reader); else if (messageType == MessageType.Deactivation) ReceiveDeactivation (reader); } } void ReceiveReply (MessageType messageType, BinaryReader reader) { int msgNumber = reader.ReadInt32 (); object value = DeserializeValue (reader); Action reply; lock (_pendingReplies) { if (!_pendingReplies.TryGetValue (msgNumber, out reply)) return; // Orphan reply _pendingReplies.Remove (msgNumber); } reply (messageType, value); } void ReceiveMethodCall (BinaryReader reader) { int messageNumber = reader.ReadInt32 (); int objectID = reader.ReadInt32 (); bool awaitRemoteTask = reader.ReadBoolean(); var method = DeserializeMethod (reader); Exception error = null; object returnValue = null; object [] args = null; IProxy proxy = null; try { args = DeserializeArguments (reader, method.GetParameters ()).ToArray (); proxy = FindProxy (objectID); if (proxy == null) throw new ObjectDisposedException ("Proxy " + objectID + " has been disposed."); } catch (Exception ex) { error = ex; } Task.Factory.StartNew (() => { if (error == null) try { var instance = proxy.LocalInstanceUntyped; if (instance == null) { string typeInfo = proxy.ObjectType == null ? "?" : proxy.ObjectType.FullName; error = new ObjectDisposedException ("Proxy " + objectID + " is disposed. Type: " + typeInfo); } else returnValue = method.Invoke (instance, args); } catch (Exception ex) { if (ex is TargetInvocationException) error = ex.InnerException; else error = ex; } SendReply (messageNumber, returnValue, error, awaitRemoteTask); }, TaskCreationOptions.PreferFairness); } void ReceiveActivation (BinaryReader reader) { int messageNumber = reader.ReadInt32 (); object instance = null; Exception error = null; try { var type = DeserializeType (reader); instance = Activator.CreateInstance (type, true); var proxy = (IProxy)Activator.CreateInstance (typeof(Proxy<>).MakeGenericType (type), BindingFlags.Instance | BindingFlags.NonPublic, null, new [] { instance, DomainAddress }, null); instance = RegisterLocalProxy (proxy); } catch (Exception ex) { error = ex; } SendReply (messageNumber, instance, error, false); } void SendReply (int messageNumber, object returnValue, Exception error, bool awaitRemoteTask) { lock (DisposeLock) { if (IsDisposed) return; if (awaitRemoteTask) { var returnTask = (Task)returnValue; // The method we're calling is itself asynchronous. Delay sending a reply until the method itself completes. returnTask.ContinueWith (ant => SendReply (messageNumber, ant.IsFaulted ? null : ant.GetUntypedResult (), ant.Exception, false)); return; } var ms = new MemoryStream (); var writer = new BinaryWriter (ms); writer.Write ((byte)(error == null ? MessageType.ReturnValue : MessageType.ReturnException)); writer.Write (messageNumber); SerializeValue (writer, error ?? returnValue); writer.Flush (); _outPipe.Write (ms.ToArray ()); } } void ReceiveDeactivation (BinaryReader reader) { int objectID = reader.ReadInt32 (); lock (_proxiesLock) { var proxy = FindProxy (objectID); if (proxy == null) return; proxy.Disconnect (); } } byte[] SerializeMethodCall (Expression expr, int messageNumber, int objectID, bool awaitRemoteTask) { if (expr == null) throw new ArgumentNullException ("expr"); MethodInfo method; IEnumerable args = new Expression [0]; var mc = expr as MethodCallExpression; if (mc != null) { method = mc.Method; args = mc.Arguments; } else { var me = expr as MemberExpression; if (me != null && me.Member is PropertyInfo) method = ((PropertyInfo)me.Member).GetGetMethod (); else throw new InvalidOperationException ("Only method calls and property reads can be serialized"); } var ms = new MemoryStream (); var writer = new BinaryWriter (ms); writer.Write ((byte)MessageType.MethodCall); writer.Write (messageNumber); writer.Write (objectID); writer.Write (awaitRemoteTask); SerializeMethod (writer, method); SerializeArguments (writer, args.Select (a => GetExprValue (a, true)).ToArray ()); writer.Flush (); return ms.ToArray (); } void SerializeArguments (BinaryWriter writer, object [] args) { writer.Write ((byte)args.Length); foreach (var o in args) SerializeValue (writer, o); } IEnumerable DeserializeArguments (BinaryReader reader, ParameterInfo [] args) { byte objectCount = reader.ReadByte (); for (int i = 0; i < objectCount; i++) yield return DeserializeValue (reader, args [i].ParameterType); } enum FastTypeCode : byte { Null, False, True, Byte, Char, String, Int32, Proxy, Other } void SerializeValue (BinaryWriter writer, object o) { if (o == null) { writer.Write ((byte)FastTypeCode.Null); } else if (o is bool) { writer.Write ((byte)((bool)o ? FastTypeCode.True : FastTypeCode.False)); } else if (o is byte) { writer.Write ((byte)FastTypeCode.Byte); writer.Write ((byte)o); } else if (o is char) { writer.Write ((byte)FastTypeCode.Char); writer.Write ((char)o); } else if (o is string) { writer.Write ((byte)FastTypeCode.String); writer.Write ((string)o); } else if (o is int) { writer.Write ((byte)FastTypeCode.Int32); writer.Write ((int)o); } else if (o is IProxy) { writer.Write ((byte)FastTypeCode.Proxy); var proxy = (IProxy)o; if (proxy.LocalInstanceUntyped != null) proxy = RegisterLocalProxy (proxy); var typeArgType = o.GetType().GetGenericArguments() [0]; SerializeType (writer, typeArgType); SerializeType (writer, proxy.LocalInstanceUntyped == null ? typeArgType : proxy.LocalInstanceUntyped.GetType()); writer.Write (proxy.ObjectID.Value); // The domain address will be zero if created via implicit conversion. writer.Write (proxy.DomainAddress == 0 ? DomainAddress : proxy.DomainAddress); } else { writer.Write ((byte)FastTypeCode.Other); writer.Flush (); new BinaryFormatter ().Serialize (writer.BaseStream, o); } } object DeserializeValue (BinaryReader reader, Type expectedType = null) { var typeCode = (FastTypeCode)reader.ReadByte (); if (typeCode == FastTypeCode.Null) return null; if (typeCode == FastTypeCode.False) return false; if (typeCode == FastTypeCode.True) return true; if (typeCode == FastTypeCode.Byte) return reader.ReadByte (); if (typeCode == FastTypeCode.Char) return reader.ReadChar (); if (typeCode == FastTypeCode.String) return reader.ReadString (); if (typeCode == FastTypeCode.Int32) return reader.ReadInt32 (); if (typeCode == FastTypeCode.Proxy) { Type genericType = DeserializeType (reader); Type actualType = DeserializeType (reader); int objectID = reader.ReadInt32 (); byte domainAddress = reader.ReadByte (); if (domainAddress == DomainAddress) // We own the real object { var proxy = FindProxy (objectID); if (proxy == null) throw new ObjectDisposedException ("Cannot deserialize type '" + genericType.Name + "' - object has been disposed"); // Automatically unmarshal if necessary: if (expectedType != null && expectedType.IsInstanceOfType (proxy.LocalInstanceUntyped)) return proxy.LocalInstanceUntyped; return proxy; } // The other domain owns the object. var proxyType = typeof(Proxy<>).MakeGenericType (genericType); return Activator.CreateInstance (proxyType, BindingFlags.NonPublic | BindingFlags.Instance, null, new object [] { this, objectID, domainAddress, actualType }, null); } return new BinaryFormatter ().Deserialize (reader.BaseStream); } void SerializeType (BinaryWriter writer, Type t) { if (t.Module == _module) { writer.Write ((byte)1); writer.Write (t.MetadataToken); } else { writer.Write ((byte)2); writer.Write (t.AssemblyQualifiedName); } } Type DeserializeType (BinaryReader reader) { int b = reader.ReadByte(); if (b == 1) return _module.ResolveType (reader.ReadInt32()); else return Type.GetType (reader.ReadString()); } void SerializeMethod (BinaryWriter writer, MethodInfo mi) { if (mi.Module == _module) { writer.Write ((byte)1); writer.Write (mi.MetadataToken); } else { writer.Write ((byte)2); writer.Write (mi.DeclaringType.AssemblyQualifiedName); writer.Write (mi.MetadataToken); } } MethodBase DeserializeMethod (BinaryReader reader) { int b = reader.ReadByte (); if (b == 1) return _module.ResolveMethod (reader.ReadInt32 ()); else return Type.GetType (reader.ReadString (), true).Module.ResolveMethod (reader.ReadInt32()); } /// Evalulates an expression tree quickly on the local side without the cost of calling Compile(). /// This works only with simple method calls and property reads. In other cases, it returns null. public static Func FastEvalExpr(Expression body) { // Optimize common cases: MethodInfo method; IEnumerable args = new Expression [0]; var mc = body as MethodCallExpression; if (mc != null) { method = mc.Method; args = mc.Arguments; } else { var me = body as MemberExpression; if (me != null && me.Member is PropertyInfo) method = ((PropertyInfo)me.Member).GetGetMethod (); else return null; } return x => { try { return (U)method.Invoke (x, args.Select (a => GetExprValue (a, false)).ToArray()); } catch (TargetInvocationException ex) { throw ex.InnerException; } }; } static object GetExprValue (Expression expr, bool deferLocalInstanceProperty) { // Optimize the common simple cases, the first being a simple constant: var constant = expr as ConstantExpression; if (constant != null) return constant.Value; // The second common simple case is accessing a field in a closure: var me = expr as MemberExpression; if (me != null && me.Member is FieldInfo && me.Expression is ConstantExpression) return ((FieldInfo)me.Member).GetValue (((ConstantExpression)me.Expression).Value); // If we're referring to the LocalInstance property of the proxy, we need to defer its evaluation // until it's deserialized at the other end, as it will likely be null: if (deferLocalInstanceProperty && me != null && me.Member is PropertyInfo) { if (me.Member.Name == "LocalInstance" && me.Member.ReflectedType.IsGenericType && me.Member.ReflectedType.GetGenericTypeDefinition () == typeof(Proxy<>)) { return GetExprValue (me.Expression, true); } } // This will take longer: var objectMember = Expression.Convert (expr, typeof(object)); var getterLambda = Expression.Lambda> (objectMember); var getter = getterLambda.Compile (); return getter (); } } #endregion #region Proxy interface IProxy { /// Nongeneric version of the LocalInstance object LocalInstanceUntyped { get; } FastChannel Channel { get; } int? ObjectID { get; } byte DomainAddress { get; } void Disconnect (); Type ObjectType { get; } bool IsDisconnected { get; } /// Connects the proxy for channel implementors. Used by FastChannel. void RegisterLocal (FastChannel fastChannel, int? objectID, Action onDisconnect); } /// /// Wraps a reference to an object that potentially lives in another domain. This ensures that cross-domain calls are explicit in the /// source code, and allows for a transport other than .NET Remoting (e.g., FastChannel). /// class Proxy : IProxy where TRemote : class { /// Any reference-type object can be implicitly converted to a proxy. The proxy will become connected /// automatically when it's serialized during a remote method call. public static implicit operator Proxy (TRemote instance) { if (instance == null) return null; return new Proxy (instance); } readonly object _locker = new object(); /// The object being Remoted. This is populated only on the local side. public TRemote LocalInstance { get; private set; } /// This is populated if the proxy was obtained via FastChannel instead of Remoting. public FastChannel Channel { get; private set; } /// This is populated if the proxy was obtained via FastChannel instead of Remoting. It uniquely identifies the /// object within the channel if the object is connected (has a presence in the other domain). public int? ObjectID { get; private set; } /// Identifies the domain that owns the local instance, when FastChannel is in use. public byte DomainAddress { get; private set; } public bool IsDisconnected { get { return LocalInstance == null && Channel == null; } } Action _onDisconnect; readonly Type _actualObjectType; object IProxy.LocalInstanceUntyped { get { return LocalInstance; } } /// The real type of the object being proxied. This may be a subclass or derived implementation of TRemote. public Type ObjectType { get { return _actualObjectType ?? (LocalInstance != null ? LocalInstance.GetType () : typeof(TRemote)); } } Proxy (IProxy conversionSource, Action onDisconnect, Type actualObjectType) { LocalInstance = (TRemote)conversionSource.LocalInstanceUntyped; Channel = conversionSource.Channel; ObjectID = conversionSource.ObjectID; DomainAddress = conversionSource.DomainAddress; _onDisconnect = onDisconnect; _actualObjectType = actualObjectType; } public Proxy (TRemote instance) { LocalInstance = instance; } // Called via reflection: Proxy (TRemote instance, byte domainAddress) { LocalInstance = instance; DomainAddress = domainAddress; } // Called via reflection: Proxy (FastChannel channel, int objectID, byte domainAddress, Type actualInstanceType) { Channel = channel; ObjectID = objectID; DomainAddress = domainAddress; _actualObjectType = actualInstanceType; } public void AssertRemote () { if (LocalInstance != null) throw new InvalidOperationException ("Object " + LocalInstance.GetType ().Name + " is not remote"); } /// Runs a (void) method on the object being proxied. This works on both the local and remote side. public Task Run (Expression> remoteMethod) { var li = LocalInstance; if (li != null) try { var fastEval = FastChannel.FastEvalExpr (remoteMethod.Body); if (fastEval != null) fastEval (li); else remoteMethod.Compile () (li); return Task.FromResult (false); } catch (Exception ex) { return Task.FromException (ex); } return SendMethodCall (remoteMethod.Body, false); } /// Runs a (void) method on the object being proxied. This works on both the local and remote side. /// Use this overload for methods on the other domain that are themselves asynchronous. public Task Run (Expression> remoteMethod) { var li = LocalInstance; if (li != null) try { var fastEval = FastChannel.FastEvalExpr (remoteMethod.Body); if (fastEval != null) return fastEval (li); return remoteMethod.Compile () (li); } catch (Exception ex) { return Task.FromException (ex); } return SendMethodCall (remoteMethod.Body, true); } /// Runs a non-void method on the object being proxied. This works on both the local and remote side. public Task Eval(Expression> remoteMethod) { var li = LocalInstance; if (li != null) try { var fastEval = FastChannel.FastEvalExpr (remoteMethod.Body); if (fastEval != null) return Task.FromResult (fastEval (li)); return Task.FromResult (remoteMethod.Compile () (li)); } catch (Exception ex) { return Task.FromException (ex); } return SendMethodCall (remoteMethod.Body, false); } /// Runs a non-void method on the object being proxied. This works on both the local and remote side. /// Use this overload for methods on the other domain that are themselves asynchronous. public Task Eval(Expression>> remoteMethod) { var li = LocalInstance; if (li != null) try { var fastEval = FastChannel.FastEvalExpr> (remoteMethod.Body); if (fastEval != null) return fastEval (li); return remoteMethod.Compile () (li); } catch (Exception ex) { return Task.FromException (ex); } return SendMethodCall (remoteMethod.Body, true); } Task SendMethodCall(Expression expressionBody, bool awaitRemoteTask) { FastChannel channel; int? objectID; lock (_locker) { if (Channel == null) return Task.FromException (new InvalidOperationException ("Channel has been disposed on Proxy<" + typeof(TRemote).Name + "> " + expressionBody)); channel = Channel; objectID = ObjectID; } return channel.SendMethodCall (expressionBody, objectID.Value, awaitRemoteTask); } /// This is useful when this.ObjectType is a subclass or derivation of TRemote. public Proxy CastTo() where TNew : class, TRemote { if (!typeof(TNew).IsAssignableFrom (ObjectType)) throw new InvalidCastException ("Type '" + ObjectType.FullName + "' cannot be cast to '" + typeof(TNew).FullName + "'."); return new Proxy (this, _onDisconnect, _actualObjectType); } void IProxy.RegisterLocal (FastChannel fastChannel, int? objectID, Action onDisconnect) { // This is called by FastChannel to connect/register the proxy. lock (_locker) { Channel = fastChannel; ObjectID = objectID; DomainAddress = fastChannel.DomainAddress; _onDisconnect = onDisconnect; } } public void Disconnect () { Action onDisconnect; lock (_locker) { onDisconnect = _onDisconnect; _onDisconnect = null; } if (onDisconnect != null) onDisconnect(); // If the remote reference drops away, we should ensure that it gets release on the other domain as well: lock (_locker) { if (Channel == null || LocalInstance != null || ObjectID == null) LocalInstance = null; else Channel.InternalDeactivate (ObjectID.Value); Channel = null; ObjectID = null; } } ~Proxy () { if (_locker != null) try { Disconnect (); } catch (ObjectDisposedException) { } } } #endregion #region Pipe class SafeMemoryMappedFile : SafeDisposable { readonly MemoryMappedFile _mmFile; readonly MemoryMappedViewAccessor _accessor; unsafe byte* _pointer; public int Length { get; private set; } public MemoryMappedViewAccessor Accessor { get { AssertSafe(); return _accessor; } } public unsafe byte* Pointer { get { AssertSafe(); return _pointer; } } public unsafe SafeMemoryMappedFile (MemoryMappedFile mmFile) { _mmFile = mmFile; _accessor = _mmFile.CreateViewAccessor (); _pointer = (byte*)_accessor.SafeMemoryMappedViewHandle.DangerousGetHandle().ToPointer(); Length = (int)_accessor.Capacity; } unsafe protected override void DisposeCore() { base.DisposeCore(); _accessor.Dispose(); _mmFile.Dispose(); _pointer = null; } } abstract class MutexFreePipe : SafeDisposable { protected const int MinimumBufferSize = 0x10000; protected readonly int MessageHeaderLength = sizeof (int); protected readonly int StartingOffset = sizeof (int) + sizeof (bool); public readonly string Name; protected readonly EventWaitHandle NewMessageSignal; protected SafeMemoryMappedFile Buffer; protected int Offset, Length; protected MutexFreePipe (string name, bool createBuffer) { Name = name; var mmFile = createBuffer ? MemoryMappedFile.CreateNew (name + ".0", MinimumBufferSize, MemoryMappedFileAccess.ReadWrite) : MemoryMappedFile.OpenExisting (name + ".0"); Buffer = new SafeMemoryMappedFile (mmFile); NewMessageSignal = new EventWaitHandle (false, EventResetMode.AutoReset, name + ".signal"); Length = Buffer.Length; Offset = StartingOffset; } protected override void DisposeCore() { base.DisposeCore(); Buffer.Dispose(); NewMessageSignal.Dispose(); } } class OutPipe : MutexFreePipe { int _messageNumber; int _bufferCount; readonly List _oldBuffers = new List (); public int PendingBuffers => _oldBuffers.Count; public OutPipe (string name, bool createBuffer) : base (name, createBuffer) { } public unsafe void Write (byte [] data) { lock (DisposeLock) // If there are multiple threads, write just one message at a time { AssertSafe(); if (data.Length > Length - Offset - 8) { // Not enough space left in the shared memory buffer to write the message. WriteContinuation (data.Length); } WriteMessage (data); NewMessageSignal.Set(); // Signal reader that a message is available } } unsafe void WriteMessage (byte [] block) { byte* ptr = Buffer.Pointer; byte* offsetPointer = ptr + Offset; var msgPointer = (int*)offsetPointer; *msgPointer = block.Length; Offset += MessageHeaderLength; offsetPointer += MessageHeaderLength; if (block != null && block.Length > 0) { //MMF.Accessor.WriteArray (Offset, block, 0, block.Length); // Horribly slow. No. No. No. Marshal.Copy (block, 0, new IntPtr (offsetPointer), block.Length); Offset += block.Length; } // Write the latest message number to the start of the buffer: int* iptr = (int*)ptr; *iptr = ++_messageNumber; } void WriteContinuation (int messageSize) { // First, allocate a new buffer: string newName = Name + "." + ++_bufferCount; int newLength = Math.Max (messageSize * 10, MinimumBufferSize); var newFile = new SafeMemoryMappedFile (MemoryMappedFile.CreateNew (newName, newLength, MemoryMappedFileAccess.ReadWrite)); $"Allocated new buffer of {newLength} bytes".DumpTrace(); // Write a message to the old buffer indicating the address of the new buffer: WriteMessage (new byte [0]); // Keep the old buffer alive until the reader has indicated that it's seen it: _oldBuffers.Add (Buffer); // Make the new buffer current: Buffer = newFile; Length = newFile.Length; Offset = StartingOffset; // Release old buffers that have been read: foreach (var buffer in _oldBuffers.Take (_oldBuffers.Count - 1).ToArray()) lock (buffer.DisposeLock) if (!buffer.IsDisposed && buffer.Accessor.ReadBoolean (4)) { _oldBuffers.Remove (buffer); buffer.Dispose(); $"Cleaned file".DumpTrace(); } } protected override void DisposeCore() { base.DisposeCore(); foreach (var buffer in _oldBuffers) buffer.Dispose(); } } class InPipe : MutexFreePipe { int _lastMessageProcessed; int _bufferCount; readonly Action _onMessage; public InPipe (string name, bool createBuffer, Action onMessage) : base (name, createBuffer) { _onMessage = onMessage; new Thread (Go).Start(); } void Go() { int spinCycles = 0; while (true) { int? latestMessageID = GetLatestMessageID (); if (latestMessageID == null) return; // We've been disposed. if (latestMessageID > _lastMessageProcessed) { Thread.MemoryBarrier (); // We need this because of lock-free implementation byte [] msg = GetNextMessage (); if (msg == null) return; if (msg.Length > 0 && _onMessage != null) _onMessage (msg); // Zero-length msg will be a buffer continuation spinCycles = 1000; } if (spinCycles == 0) { NewMessageSignal.WaitOne(); if (IsDisposed) return; } else { Thread.MemoryBarrier (); // We need this because of lock-free implementation spinCycles--; } } } unsafe int? GetLatestMessageID () { lock (DisposeLock) lock (Buffer.DisposeLock) return IsDisposed || Buffer.IsDisposed ? (int?)null : *((int*)Buffer.Pointer); } unsafe byte [] GetNextMessage () { _lastMessageProcessed++; lock (DisposeLock) { if (IsDisposed) return null; lock (Buffer.DisposeLock) { if (Buffer.IsDisposed) return null; byte* offsetPointer = Buffer.Pointer + Offset; var msgPointer = (int*)offsetPointer; int msgLength = *msgPointer; Offset += MessageHeaderLength; offsetPointer += MessageHeaderLength; if (msgLength == 0) { Buffer.Accessor.Write (4, true); // Signal that we no longer need file Buffer.Dispose(); string newName = Name + "." + ++_bufferCount; Buffer = new SafeMemoryMappedFile (MemoryMappedFile.OpenExisting (newName)); Offset = StartingOffset; return new byte [0]; } Offset += msgLength; //MMF.Accessor.ReadArray (Offset, msg, 0, msg.Length); // too slow var msg = new byte [msgLength]; Marshal.Copy (new IntPtr (offsetPointer), msg, 0, msg.Length); return msg; } } } protected override void DisposeCore() { NewMessageSignal.Set(); base.DisposeCore(); } } #endregion #region SafeDisposable public class SafeDisposable : IDisposable { public object DisposeLock = new object(); public bool IsDisposed { get; private set; } public void Dispose() { lock (DisposeLock) if (!IsDisposed) { IsDisposed = true; DisposeCore(); } } protected virtual void DisposeCore() { } public void AssertSafe() { lock (DisposeLock) if (IsDisposed) throw new ObjectDisposedException (GetType().Name + " has been disposed"); } } #endregion #region Extensions static class Extensions { public static object GetUntypedResult (this Task t) { if (!t.GetType ().IsGenericType || t.GetType().GetGenericArguments().First().Name == "VoidTaskResult") return null; return ((dynamic)t).Result; } } #endregion