Decompiled source of System IO Pipelines v9.0.400
BepInEx/core/System.IO.Pipelines/netstandard2.0/System.IO.Pipelines.dll
Decompiled a month ago
The result has been truncated due to the large size, download it to view full contents!
using System; using System.Buffers; using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Reflection; using System.Resources; using System.Runtime.CompilerServices; using System.Runtime.ExceptionServices; using System.Runtime.InteropServices; using System.Runtime.Versioning; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Sources; using FxResources.System.IO.Pipelines; using Microsoft.CodeAnalysis; [assembly: CompilationRelaxations(8)] [assembly: RuntimeCompatibility(WrapNonExceptionThrows = true)] [assembly: Debuggable(DebuggableAttribute.DebuggingModes.IgnoreSymbolStoreSequencePoints)] [assembly: InternalsVisibleTo("System.IO.Pipelines.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001004b86c4cb78549b34bab61a3b1800e23bfeb5b3ec390074041536a7e3cbd97f5f04cf0f857155a8928eaa29ebfd11cfbbad3ba70efea7bda3226c6a8d370a4cd303f714486b6ebc225985a638471e6ef571cc92a4613c00b8fa65d61ccee0cbe5f36330c9a01f4183559f1bef24cc2917c6d913e3a541333a1d05d9bed22b38cb")] [assembly: TargetFramework(".NETStandard,Version=v2.0", FrameworkDisplayName = ".NET Standard 2.0")] [assembly: AssemblyMetadata("Serviceable", "True")] [assembly: AssemblyMetadata("PreferInbox", "True")] [assembly: AssemblyDefaultAlias("System.IO.Pipelines")] [assembly: NeutralResourcesLanguage("en-US")] [assembly: CLSCompliant(true)] [assembly: AssemblyMetadata("IsTrimmable", "True")] [assembly: DefaultDllImportSearchPaths(DllImportSearchPath.System32 | DllImportSearchPath.AssemblyDirectory)] [assembly: AssemblyCompany("Microsoft Corporation")] [assembly: AssemblyCopyright("© Microsoft Corporation. All rights reserved.")] [assembly: AssemblyDescription("Single producer single consumer byte buffer management.\r\n\r\nCommonly Used Types:\r\nSystem.IO.Pipelines.Pipe\r\nSystem.IO.Pipelines.PipeWriter\r\nSystem.IO.Pipelines.PipeReader")] [assembly: AssemblyFileVersion("9.0.425.16305")] [assembly: AssemblyInformationalVersion("9.0.4+f57e6dc747158ab7ade4e62a75a6750d16b771e8")] [assembly: AssemblyProduct("Microsoft® .NET")] [assembly: AssemblyTitle("System.IO.Pipelines")] [assembly: AssemblyMetadata("RepositoryUrl", "https://github.com/dotnet/runtime")] [assembly: AssemblyVersion("9.0.0.0")] [module: RefSafetyRules(11)] [module: System.Runtime.CompilerServices.NullablePublicOnly(true)] namespace Microsoft.CodeAnalysis { [CompilerGenerated] [Microsoft.CodeAnalysis.Embedded] internal sealed class EmbeddedAttribute : Attribute { } } namespace System.Runtime.CompilerServices { [CompilerGenerated] [Microsoft.CodeAnalysis.Embedded] internal sealed class IsReadOnlyAttribute : Attribute { } [CompilerGenerated] [Microsoft.CodeAnalysis.Embedded] [AttributeUsage(AttributeTargets.Class | AttributeTargets.Property | AttributeTargets.Field | AttributeTargets.Event | AttributeTargets.Parameter | AttributeTargets.ReturnValue | AttributeTargets.GenericParameter, AllowMultiple = false, Inherited = false)] internal sealed class NullableAttribute : Attribute { public readonly byte[] NullableFlags; public NullableAttribute(byte P_0) { NullableFlags = new byte[1] { P_0 }; } public NullableAttribute(byte[] P_0) { NullableFlags = P_0; } } [CompilerGenerated] [Microsoft.CodeAnalysis.Embedded] [AttributeUsage(AttributeTargets.Class | AttributeTargets.Struct | AttributeTargets.Method | AttributeTargets.Interface | AttributeTargets.Delegate, AllowMultiple = false, Inherited = false)] internal sealed class NullableContextAttribute : Attribute { public readonly byte Flag; public NullableContextAttribute(byte P_0) { Flag = P_0; } } [CompilerGenerated] [Microsoft.CodeAnalysis.Embedded] [AttributeUsage(AttributeTargets.Module, AllowMultiple = false, Inherited = false)] internal sealed class NullablePublicOnlyAttribute : Attribute { public readonly bool IncludesInternals; public NullablePublicOnlyAttribute(bool P_0) { IncludesInternals = P_0; } } [CompilerGenerated] [Microsoft.CodeAnalysis.Embedded] [AttributeUsage(AttributeTargets.Module, AllowMultiple = false, Inherited = false)] internal sealed class RefSafetyRulesAttribute : Attribute { public readonly int Version; public RefSafetyRulesAttribute(int P_0) { Version = P_0; } } } namespace FxResources.System.IO.Pipelines { internal static class SR { } } namespace System { internal static class SR { private static readonly bool s_usingResourceKeys = GetUsingResourceKeysSwitchValue(); private static ResourceManager s_resourceManager; internal static ResourceManager ResourceManager => s_resourceManager ?? (s_resourceManager = new ResourceManager(typeof(SR))); internal static string AdvanceToInvalidCursor => GetResourceString("AdvanceToInvalidCursor"); internal static string ArgumentOutOfRange_NeedPosNum => GetResourceString("ArgumentOutOfRange_NeedPosNum"); internal static string ConcurrentOperationsNotSupported => GetResourceString("ConcurrentOperationsNotSupported"); internal static string FlushCanceledOnPipeWriter => GetResourceString("FlushCanceledOnPipeWriter"); internal static string GetResultBeforeCompleted => GetResourceString("GetResultBeforeCompleted"); internal static string InvalidExaminedOrConsumedPosition => GetResourceString("InvalidExaminedOrConsumedPosition"); internal static string InvalidExaminedPosition => GetResourceString("InvalidExaminedPosition"); internal static string InvalidZeroByteRead => GetResourceString("InvalidZeroByteRead"); internal static string ObjectDisposed_StreamClosed => GetResourceString("ObjectDisposed_StreamClosed"); internal static string NoReadingOperationToComplete => GetResourceString("NoReadingOperationToComplete"); internal static string NotSupported_UnreadableStream => GetResourceString("NotSupported_UnreadableStream"); internal static string NotSupported_UnwritableStream => GetResourceString("NotSupported_UnwritableStream"); internal static string ReadCanceledOnPipeReader => GetResourceString("ReadCanceledOnPipeReader"); internal static string ReaderAndWriterHasToBeCompleted => GetResourceString("ReaderAndWriterHasToBeCompleted"); internal static string ReadingAfterCompleted => GetResourceString("ReadingAfterCompleted"); internal static string ReadingIsInProgress => GetResourceString("ReadingIsInProgress"); internal static string WritingAfterCompleted => GetResourceString("WritingAfterCompleted"); internal static string UnflushedBytesNotSupported => GetResourceString("UnflushedBytesNotSupported"); private static bool GetUsingResourceKeysSwitchValue() { if (!AppContext.TryGetSwitch("System.Resources.UseSystemResourceKeys", out var isEnabled)) { return false; } return isEnabled; } internal static bool UsingResourceKeys() { return s_usingResourceKeys; } private static string GetResourceString(string resourceKey) { if (UsingResourceKeys()) { return resourceKey; } string result = null; try { result = ResourceManager.GetString(resourceKey); } catch (MissingManifestResourceException) { } return result; } private static string GetResourceString(string resourceKey, string defaultString) { string resourceString = GetResourceString(resourceKey); if (!(resourceKey == resourceString) && resourceString != null) { return resourceString; } return defaultString; } internal static string Format(string resourceFormat, object? p1) { if (UsingResourceKeys()) { return string.Join(", ", resourceFormat, p1); } return string.Format(resourceFormat, p1); } internal static string Format(string resourceFormat, object? p1, object? p2) { if (UsingResourceKeys()) { return string.Join(", ", resourceFormat, p1, p2); } return string.Format(resourceFormat, p1, p2); } internal static string Format(string resourceFormat, object? p1, object? p2, object? p3) { if (UsingResourceKeys()) { return string.Join(", ", resourceFormat, p1, p2, p3); } return string.Format(resourceFormat, p1, p2, p3); } internal static string Format(string resourceFormat, params object?[]? args) { if (args != null) { if (UsingResourceKeys()) { return resourceFormat + ", " + string.Join(", ", args); } return string.Format(resourceFormat, args); } return resourceFormat; } internal static string Format(IFormatProvider? provider, string resourceFormat, object? p1) { if (UsingResourceKeys()) { return string.Join(", ", resourceFormat, p1); } return string.Format(provider, resourceFormat, p1); } internal static string Format(IFormatProvider? provider, string resourceFormat, object? p1, object? p2) { if (UsingResourceKeys()) { return string.Join(", ", resourceFormat, p1, p2); } return string.Format(provider, resourceFormat, p1, p2); } internal static string Format(IFormatProvider? provider, string resourceFormat, object? p1, object? p2, object? p3) { if (UsingResourceKeys()) { return string.Join(", ", resourceFormat, p1, p2, p3); } return string.Format(provider, resourceFormat, p1, p2, p3); } internal static string Format(IFormatProvider? provider, string resourceFormat, params object?[]? args) { if (args != null) { if (UsingResourceKeys()) { return resourceFormat + ", " + string.Join(", ", args); } return string.Format(provider, resourceFormat, args); } return resourceFormat; } } } namespace System.Diagnostics.CodeAnalysis { [AttributeUsage(AttributeTargets.Property | AttributeTargets.Field | AttributeTargets.Parameter, Inherited = false)] internal sealed class AllowNullAttribute : Attribute { } [AttributeUsage(AttributeTargets.Property | AttributeTargets.Field | AttributeTargets.Parameter, Inherited = false)] internal sealed class DisallowNullAttribute : Attribute { } [AttributeUsage(AttributeTargets.Property | AttributeTargets.Field | AttributeTargets.Parameter | AttributeTargets.ReturnValue, Inherited = false)] internal sealed class MaybeNullAttribute : Attribute { } [AttributeUsage(AttributeTargets.Property | AttributeTargets.Field | AttributeTargets.Parameter | AttributeTargets.ReturnValue, Inherited = false)] internal sealed class NotNullAttribute : Attribute { } [AttributeUsage(AttributeTargets.Parameter, Inherited = false)] internal sealed class MaybeNullWhenAttribute : Attribute { public bool ReturnValue { get; } public MaybeNullWhenAttribute(bool returnValue) { ReturnValue = returnValue; } } [AttributeUsage(AttributeTargets.Parameter, Inherited = false)] internal sealed class NotNullWhenAttribute : Attribute { public bool ReturnValue { get; } public NotNullWhenAttribute(bool returnValue) { ReturnValue = returnValue; } } [AttributeUsage(AttributeTargets.Property | AttributeTargets.Parameter | AttributeTargets.ReturnValue, AllowMultiple = true, Inherited = false)] internal sealed class NotNullIfNotNullAttribute : Attribute { public string ParameterName { get; } public NotNullIfNotNullAttribute(string parameterName) { ParameterName = parameterName; } } [AttributeUsage(AttributeTargets.Method, Inherited = false)] internal sealed class DoesNotReturnAttribute : Attribute { } [AttributeUsage(AttributeTargets.Parameter, Inherited = false)] internal sealed class DoesNotReturnIfAttribute : Attribute { public bool ParameterValue { get; } public DoesNotReturnIfAttribute(bool parameterValue) { ParameterValue = parameterValue; } } [AttributeUsage(AttributeTargets.Method | AttributeTargets.Property, Inherited = false, AllowMultiple = true)] internal sealed class MemberNotNullAttribute : Attribute { public string[] Members { get; } public MemberNotNullAttribute(string member) { Members = new string[1] { member }; } public MemberNotNullAttribute(params string[] members) { Members = members; } } [AttributeUsage(AttributeTargets.Method | AttributeTargets.Property, Inherited = false, AllowMultiple = true)] internal sealed class MemberNotNullWhenAttribute : Attribute { public bool ReturnValue { get; } public string[] Members { get; } public MemberNotNullWhenAttribute(bool returnValue, string member) { ReturnValue = returnValue; Members = new string[1] { member }; } public MemberNotNullWhenAttribute(bool returnValue, params string[] members) { ReturnValue = returnValue; Members = members; } } } namespace System.Runtime.InteropServices { [AttributeUsage(AttributeTargets.Method, AllowMultiple = false, Inherited = false)] internal sealed class LibraryImportAttribute : Attribute { public string LibraryName { get; } public string? EntryPoint { get; set; } public StringMarshalling StringMarshalling { get; set; } public Type? StringMarshallingCustomType { get; set; } public bool SetLastError { get; set; } public LibraryImportAttribute(string libraryName) { LibraryName = libraryName; } } internal enum StringMarshalling { Custom, Utf8, Utf16 } } namespace System.Threading { internal static class CancellationTokenExtensions { internal static CancellationTokenRegistration UnsafeRegister(this CancellationToken cancellationToken, Action<object> callback, object state) { return cancellationToken.Register(callback, state); } } } namespace System.Threading.Tasks { internal static class TaskToAsyncResult { private sealed class TaskAsyncResult : IAsyncResult { internal readonly Task _task; private readonly AsyncCallback _callback; public object AsyncState { get; } public bool CompletedSynchronously { get; } public bool IsCompleted => _task.IsCompleted; public WaitHandle AsyncWaitHandle => ((IAsyncResult)_task).AsyncWaitHandle; internal TaskAsyncResult(Task task, object state, AsyncCallback callback) { _task = task; AsyncState = state; if (task.IsCompleted) { CompletedSynchronously = true; callback?.Invoke(this); } else if (callback != null) { _callback = callback; _task.ConfigureAwait(continueOnCapturedContext: false).GetAwaiter().OnCompleted(delegate { _callback(this); }); } } } public static IAsyncResult Begin(Task task, AsyncCallback? callback, object? state) { if (task == null) { throw new ArgumentNullException("task"); } return new TaskAsyncResult(task, state, callback); } public static void End(IAsyncResult asyncResult) { Unwrap(asyncResult).GetAwaiter().GetResult(); } public static TResult End<TResult>(IAsyncResult asyncResult) { return Unwrap<TResult>(asyncResult).GetAwaiter().GetResult(); } public static Task Unwrap(IAsyncResult asyncResult) { if (asyncResult == null) { throw new ArgumentNullException("asyncResult"); } return (asyncResult as TaskAsyncResult)?._task ?? throw new ArgumentException(null, "asyncResult"); } public static Task<TResult> Unwrap<TResult>(IAsyncResult asyncResult) { if (asyncResult == null) { throw new ArgumentNullException("asyncResult"); } return ((asyncResult as TaskAsyncResult)?._task as Task<TResult>) ?? throw new ArgumentException(null, "asyncResult"); } } } namespace System.IO { internal static class StreamHelpers { public static void ValidateCopyToArgs(Stream source, Stream destination, int bufferSize) { if (destination == null) { throw new ArgumentNullException("destination"); } if (bufferSize <= 0) { throw new ArgumentOutOfRangeException("bufferSize", bufferSize, System.SR.ArgumentOutOfRange_NeedPosNum); } bool canRead = source.CanRead; if (!canRead && !source.CanWrite) { throw new ObjectDisposedException(null, System.SR.ObjectDisposed_StreamClosed); } bool canWrite = destination.CanWrite; if (!canWrite && !destination.CanRead) { throw new ObjectDisposedException("destination", System.SR.ObjectDisposed_StreamClosed); } if (!canRead) { throw new NotSupportedException(System.SR.NotSupported_UnreadableStream); } if (!canWrite) { throw new NotSupportedException(System.SR.NotSupported_UnwritableStream); } } } } namespace System.IO.Pipelines { internal sealed class BufferSegment : ReadOnlySequenceSegment<byte> { private IMemoryOwner<byte> _memoryOwner; private byte[] _array; private BufferSegment _next; private int _end; public int End { get { return _end; } set { _end = value; base.Memory = AvailableMemory.Slice(0, value); } } public BufferSegment? NextSegment { get { return _next; } set { base.Next = value; _next = value; } } internal object? MemoryOwner => ((object)_memoryOwner) ?? ((object)_array); public Memory<byte> AvailableMemory { get; private set; } public int Length => End; public int WritableBytes { [MethodImpl(MethodImplOptions.AggressiveInlining)] get { return AvailableMemory.Length - End; } } public void SetOwnedMemory(IMemoryOwner<byte> memoryOwner) { _memoryOwner = memoryOwner; AvailableMemory = memoryOwner.Memory; } public void SetOwnedMemory(byte[] arrayPoolBuffer) { _array = arrayPoolBuffer; AvailableMemory = arrayPoolBuffer; } public void Reset() { ResetMemory(); base.Next = null; base.RunningIndex = 0L; _next = null; } public void ResetMemory() { IMemoryOwner<byte> memoryOwner = _memoryOwner; if (memoryOwner != null) { _memoryOwner = null; memoryOwner.Dispose(); } else { ArrayPool<byte>.Shared.Return(_array); _array = null; } base.Memory = default(ReadOnlyMemory<byte>); _end = 0; AvailableMemory = default(Memory<byte>); } public void SetNext(BufferSegment segment) { NextSegment = segment; segment = this; while (segment.Next != null) { segment.NextSegment.RunningIndex = segment.RunningIndex + segment.Length; segment = segment.NextSegment; } } [MethodImpl(MethodImplOptions.AggressiveInlining)] internal static long GetLength(BufferSegment startSegment, int startIndex, BufferSegment endSegment, int endIndex) { return endSegment.RunningIndex + (uint)endIndex - (startSegment.RunningIndex + (uint)startIndex); } [MethodImpl(MethodImplOptions.AggressiveInlining)] internal static long GetLength(long startPosition, BufferSegment endSegment, int endIndex) { return endSegment.RunningIndex + (uint)endIndex - startPosition; } } internal readonly struct CompletionData { public Action<object?> Completion { get; } public object? CompletionState { get; } public ExecutionContext? ExecutionContext { get; } public SynchronizationContext? SynchronizationContext { get; } public CompletionData(Action<object?> completion, object? completionState, ExecutionContext? executionContext, SynchronizationContext? synchronizationContext) { Completion = completion; CompletionState = completionState; ExecutionContext = executionContext; SynchronizationContext = synchronizationContext; } } public struct FlushResult { internal ResultFlags _resultFlags; public bool IsCanceled => (_resultFlags & ResultFlags.Canceled) != 0; public bool IsCompleted => (_resultFlags & ResultFlags.Completed) != 0; public FlushResult(bool isCanceled, bool isCompleted) { _resultFlags = ResultFlags.None; if (isCanceled) { _resultFlags |= ResultFlags.Canceled; } if (isCompleted) { _resultFlags |= ResultFlags.Completed; } } } internal sealed class InlineScheduler : PipeScheduler { public override void Schedule(Action<object?> action, object? state) { action(state); } internal override void UnsafeSchedule(Action<object?> action, object? state) { action(state); } } public interface IDuplexPipe { PipeReader Input { get; } PipeWriter Output { get; } } internal struct BufferSegmentStack { private readonly struct SegmentAsValueType { private readonly BufferSegment _value; private SegmentAsValueType(BufferSegment value) { _value = value; } public static implicit operator SegmentAsValueType(BufferSegment s) { return new SegmentAsValueType(s); } public static implicit operator BufferSegment(SegmentAsValueType s) { return s._value; } } private SegmentAsValueType[] _array; private int _size; public int Count => _size; public BufferSegmentStack(int size) { _array = new SegmentAsValueType[size]; _size = 0; } public bool TryPop([NotNullWhen(true)] out BufferSegment? result) { int num = _size - 1; SegmentAsValueType[] array = _array; if ((uint)num >= (uint)array.Length) { result = null; return false; } _size = num; result = array[num]; array[num] = default(SegmentAsValueType); return true; } public void Push(BufferSegment item) { int size = _size; SegmentAsValueType[] array = _array; if ((uint)size < (uint)array.Length) { array[size] = item; _size = size + 1; } else { PushWithResize(item); } } [MethodImpl(MethodImplOptions.NoInlining)] private void PushWithResize(BufferSegment item) { Array.Resize(ref _array, 2 * _array.Length); _array[_size] = item; _size++; } } public sealed class Pipe { private sealed class DefaultPipeReader : PipeReader, IValueTaskSource<ReadResult> { private readonly Pipe _pipe; public DefaultPipeReader(Pipe pipe) { _pipe = pipe; } public override bool TryRead(out ReadResult result) { return _pipe.TryRead(out result); } public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default(CancellationToken)) { return _pipe.ReadAsync(cancellationToken); } protected override ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumBytes, CancellationToken cancellationToken) { return _pipe.ReadAtLeastAsync(minimumBytes, cancellationToken); } public override void AdvanceTo(SequencePosition consumed) { _pipe.AdvanceReader(in consumed); } public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) { _pipe.AdvanceReader(in consumed, in examined); } public override void CancelPendingRead() { _pipe.CancelPendingRead(); } public override void Complete(Exception exception = null) { _pipe.CompleteReader(exception); } public override void OnWriterCompleted(Action<Exception, object> callback, object state) { _pipe.OnWriterCompleted(callback, state); } public ValueTaskSourceStatus GetStatus(short token) { return _pipe.GetReadAsyncStatus(); } public ReadResult GetResult(short token) { return _pipe.GetReadAsyncResult(); } public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) { _pipe.OnReadAsyncCompleted(continuation, state, flags); } } private sealed class DefaultPipeWriter : PipeWriter, IValueTaskSource<FlushResult> { private readonly Pipe _pipe; public override bool CanGetUnflushedBytes => true; public override long UnflushedBytes => _pipe.GetUnflushedBytes(); public DefaultPipeWriter(Pipe pipe) { _pipe = pipe; } public override void Complete(Exception exception = null) { _pipe.CompleteWriter(exception); } public override void CancelPendingFlush() { _pipe.CancelPendingFlush(); } public override void OnReaderCompleted(Action<Exception, object> callback, object state) { _pipe.OnReaderCompleted(callback, state); } public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default(CancellationToken)) { return _pipe.FlushAsync(cancellationToken); } public override void Advance(int bytes) { _pipe.Advance(bytes); } public override Memory<byte> GetMemory(int sizeHint = 0) { return _pipe.GetMemory(sizeHint); } public override Span<byte> GetSpan(int sizeHint = 0) { return _pipe.GetSpan(sizeHint); } public ValueTaskSourceStatus GetStatus(short token) { return _pipe.GetFlushAsyncStatus(); } public FlushResult GetResult(short token) { return _pipe.GetFlushAsyncResult(); } public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) { _pipe.OnFlushAsyncCompleted(continuation, state, flags); } public override ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken)) { return _pipe.WriteAsync(source, cancellationToken); } } private static readonly Action<object> s_signalReaderAwaitable = delegate(object state) { ((Pipe)state).ReaderCancellationRequested(); }; private static readonly Action<object> s_signalWriterAwaitable = delegate(object state) { ((Pipe)state).WriterCancellationRequested(); }; private static readonly Action<object> s_invokeCompletionCallbacks = delegate(object state) { ((PipeCompletionCallbacks)state).Execute(); }; private static readonly ContextCallback s_executionContextRawCallback = ExecuteWithoutExecutionContext; private static readonly SendOrPostCallback s_syncContextExecutionContextCallback = ExecuteWithExecutionContext; private static readonly SendOrPostCallback s_syncContextExecuteWithoutExecutionContextCallback = ExecuteWithoutExecutionContext; private static readonly Action<object> s_scheduleWithExecutionContextCallback = ExecuteWithExecutionContext; private BufferSegmentStack _bufferSegmentPool; private readonly DefaultPipeReader _reader; private readonly DefaultPipeWriter _writer; private readonly PipeOptions _options; private readonly object _sync = new object(); private long _unconsumedBytes; private long _unflushedBytes; private PipeAwaitable _readerAwaitable; private PipeAwaitable _writerAwaitable; private PipeCompletion _writerCompletion; private PipeCompletion _readerCompletion; private long _lastExaminedIndex = -1L; private BufferSegment _readHead; private int _readHeadIndex; private bool _disposed; private BufferSegment _readTail; private int _readTailIndex; private int _minimumReadBytes; private BufferSegment _writingHead; private Memory<byte> _writingHeadMemory; private int _writingHeadBytesBuffered; private PipeOperationState _operationState; private bool UseSynchronizationContext => _options.UseSynchronizationContext; private int MinimumSegmentSize => _options.MinimumSegmentSize; private long PauseWriterThreshold => _options.PauseWriterThreshold; private long ResumeWriterThreshold => _options.ResumeWriterThreshold; private PipeScheduler ReaderScheduler => _options.ReaderScheduler; private PipeScheduler WriterScheduler => _options.WriterScheduler; private object SyncObj => _sync; internal long Length => _unconsumedBytes; public PipeReader Reader => _reader; public PipeWriter Writer => _writer; public Pipe() : this(PipeOptions.Default) { } public Pipe(PipeOptions options) { if (options == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.options); } _bufferSegmentPool = new BufferSegmentStack(options.InitialSegmentPoolSize); _operationState = default(PipeOperationState); _readerCompletion = default(PipeCompletion); _writerCompletion = default(PipeCompletion); _options = options; _readerAwaitable = new PipeAwaitable(completed: false, UseSynchronizationContext); _writerAwaitable = new PipeAwaitable(completed: true, UseSynchronizationContext); _reader = new DefaultPipeReader(this); _writer = new DefaultPipeWriter(this); } private void ResetState() { _readerCompletion.Reset(); _writerCompletion.Reset(); _readerAwaitable = new PipeAwaitable(completed: false, UseSynchronizationContext); _writerAwaitable = new PipeAwaitable(completed: true, UseSynchronizationContext); _readTailIndex = 0; _readHeadIndex = 0; _lastExaminedIndex = -1L; _unflushedBytes = 0L; _unconsumedBytes = 0L; } internal Memory<byte> GetMemory(int sizeHint) { if (_writerCompletion.IsCompleted) { ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); } if (sizeHint < 0) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.sizeHint); } AllocateWriteHeadIfNeeded(sizeHint); return _writingHeadMemory; } internal Span<byte> GetSpan(int sizeHint) { if (_writerCompletion.IsCompleted) { ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); } if (sizeHint < 0) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.sizeHint); } AllocateWriteHeadIfNeeded(sizeHint); return _writingHeadMemory.Span; } [MethodImpl(MethodImplOptions.AggressiveInlining)] private void AllocateWriteHeadIfNeeded(int sizeHint) { if (!_operationState.IsWritingActive || _writingHeadMemory.Length == 0 || _writingHeadMemory.Length < sizeHint) { AllocateWriteHeadSynchronized(sizeHint); } } private void AllocateWriteHeadSynchronized(int sizeHint) { lock (SyncObj) { _operationState.BeginWrite(); if (_writingHead == null) { BufferSegment readTail = AllocateSegment(sizeHint); _writingHead = (_readHead = (_readTail = readTail)); _lastExaminedIndex = 0L; return; } int length = _writingHeadMemory.Length; if (length == 0 || length < sizeHint) { if (_writingHeadBytesBuffered > 0) { _writingHead.End += _writingHeadBytesBuffered; _writingHeadBytesBuffered = 0; } if (_writingHead.Length == 0) { _writingHead.ResetMemory(); RentMemory(_writingHead, sizeHint); } else { BufferSegment bufferSegment = AllocateSegment(sizeHint); _writingHead.SetNext(bufferSegment); _writingHead = bufferSegment; } } } } private BufferSegment AllocateSegment(int sizeHint) { BufferSegment bufferSegment = CreateSegmentUnsynchronized(); RentMemory(bufferSegment, sizeHint); return bufferSegment; } private void RentMemory(BufferSegment segment, int sizeHint) { MemoryPool<byte> memoryPool = null; int num = -1; if (!_options.IsDefaultSharedMemoryPool) { memoryPool = _options.Pool; num = memoryPool.MaxBufferSize; } if (sizeHint <= num) { segment.SetOwnedMemory(memoryPool.Rent(GetSegmentSize(sizeHint, num))); } else { int segmentSize = GetSegmentSize(sizeHint); segment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(segmentSize)); } _writingHeadMemory = segment.AvailableMemory; } private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue) { sizeHint = Math.Max(MinimumSegmentSize, sizeHint); return Math.Min(maxBufferSize, sizeHint); } private BufferSegment CreateSegmentUnsynchronized() { if (_bufferSegmentPool.TryPop(out BufferSegment result)) { return result; } return new BufferSegment(); } private void ReturnSegmentUnsynchronized(BufferSegment segment) { if (_bufferSegmentPool.Count < _options.MaxSegmentPoolSize) { _bufferSegmentPool.Push(segment); } } internal bool CommitUnsynchronized() { _operationState.EndWrite(); if (_unflushedBytes == 0L) { return false; } _writingHead.End += _writingHeadBytesBuffered; _readTail = _writingHead; _readTailIndex = _writingHead.End; long unconsumedBytes = _unconsumedBytes; _unconsumedBytes += _unflushedBytes; bool result = true; if (_unconsumedBytes < _minimumReadBytes) { result = false; } else if (PauseWriterThreshold > 0 && unconsumedBytes < PauseWriterThreshold && _unconsumedBytes >= PauseWriterThreshold && !_readerCompletion.IsCompleted) { _writerAwaitable.SetUncompleted(); } _unflushedBytes = 0L; _writingHeadBytesBuffered = 0; return result; } internal void Advance(int bytes) { lock (SyncObj) { if ((uint)bytes > (uint)_writingHeadMemory.Length) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes); } if (!_readerCompletion.IsCompleted) { AdvanceCore(bytes); } } } [MethodImpl(MethodImplOptions.AggressiveInlining)] private void AdvanceCore(int bytesWritten) { _unflushedBytes += bytesWritten; _writingHeadBytesBuffered += bytesWritten; _writingHeadMemory = _writingHeadMemory.Slice(bytesWritten); } internal ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) { return new ValueTask<FlushResult>(Task.FromCanceled<FlushResult>(cancellationToken)); } CompletionData completionData; ValueTask<FlushResult> result; lock (SyncObj) { PrepareFlushUnsynchronized(out completionData, out result, cancellationToken); } TrySchedule(ReaderScheduler, in completionData); return result; } private void PrepareFlushUnsynchronized(out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken) { bool num = CommitUnsynchronized(); _writerAwaitable.BeginOperation(cancellationToken, s_signalWriterAwaitable, this); if (_writerAwaitable.IsCompleted) { FlushResult result2 = default(FlushResult); GetFlushResult(ref result2); result = new ValueTask<FlushResult>(result2); } else { result = new ValueTask<FlushResult>(_writer, 0); } if (num) { _readerAwaitable.Complete(out completionData); } else { completionData = default(CompletionData); } } internal void CompleteWriter(Exception? exception) { PipeCompletionCallbacks pipeCompletionCallbacks; CompletionData completionData; bool isCompleted; lock (SyncObj) { CommitUnsynchronized(); pipeCompletionCallbacks = _writerCompletion.TryComplete(exception); _readerAwaitable.Complete(out completionData); isCompleted = _readerCompletion.IsCompleted; } if (isCompleted) { CompletePipe(); } if (pipeCompletionCallbacks != null) { ScheduleCallbacks(ReaderScheduler, pipeCompletionCallbacks); } TrySchedule(ReaderScheduler, in completionData); } internal void AdvanceReader(in SequencePosition consumed) { AdvanceReader(in consumed, in consumed); } internal void AdvanceReader(in SequencePosition consumed, in SequencePosition examined) { if (_readerCompletion.IsCompleted) { ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed(); } AdvanceReader((BufferSegment)consumed.GetObject(), consumed.GetInteger(), (BufferSegment)examined.GetObject(), examined.GetInteger()); } private void AdvanceReader(BufferSegment consumedSegment, int consumedIndex, BufferSegment examinedSegment, int examinedIndex) { if (consumedSegment != null && examinedSegment != null && BufferSegment.GetLength(consumedSegment, consumedIndex, examinedSegment, examinedIndex) < 0) { ThrowHelper.ThrowInvalidOperationException_InvalidExaminedOrConsumedPosition(); } BufferSegment bufferSegment = null; BufferSegment returnEnd = null; CompletionData completionData = default(CompletionData); lock (SyncObj) { bool flag = false; if (examinedSegment == _readTail) { flag = examinedIndex == _readTailIndex; } if (examinedSegment != null && _lastExaminedIndex >= 0) { long length = BufferSegment.GetLength(_lastExaminedIndex, examinedSegment, examinedIndex); long unconsumedBytes = _unconsumedBytes; if (length < 0) { ThrowHelper.ThrowInvalidOperationException_InvalidExaminedPosition(); } _unconsumedBytes -= length; _lastExaminedIndex = examinedSegment.RunningIndex + examinedIndex; if (unconsumedBytes >= ResumeWriterThreshold && _unconsumedBytes < ResumeWriterThreshold) { _writerAwaitable.Complete(out completionData); } } if (consumedSegment != null) { if (_readHead == null) { ThrowHelper.ThrowInvalidOperationException_AdvanceToInvalidCursor(); return; } bufferSegment = _readHead; returnEnd = consumedSegment; if (consumedIndex == returnEnd.Length) { if (_writingHead != returnEnd) { MoveReturnEndToNextBlock(); } else if (_writingHeadBytesBuffered == 0 && !_operationState.IsWritingActive) { _writingHead = null; _writingHeadMemory = default(Memory<byte>); MoveReturnEndToNextBlock(); } else { _readHead = consumedSegment; _readHeadIndex = consumedIndex; } } else { _readHead = consumedSegment; _readHeadIndex = consumedIndex; } } if (flag && !_writerCompletion.IsCompleted) { _readerAwaitable.SetUncompleted(); } while (bufferSegment != null && bufferSegment != returnEnd) { BufferSegment? nextSegment = bufferSegment.NextSegment; bufferSegment.Reset(); ReturnSegmentUnsynchronized(bufferSegment); bufferSegment = nextSegment; } _operationState.EndRead(); } TrySchedule(WriterScheduler, in completionData); void MoveReturnEndToNextBlock() { BufferSegment nextSegment2 = returnEnd.NextSegment; if (_readTail == returnEnd) { _readTail = nextSegment2; _readTailIndex = 0; } _readHead = nextSegment2; _readHeadIndex = 0; returnEnd = nextSegment2; } } internal void CompleteReader(Exception? exception) { PipeCompletionCallbacks pipeCompletionCallbacks; CompletionData completionData; bool isCompleted; lock (SyncObj) { if (_operationState.IsReadingActive) { _operationState.EndRead(); } pipeCompletionCallbacks = _readerCompletion.TryComplete(exception); _writerAwaitable.Complete(out completionData); isCompleted = _writerCompletion.IsCompleted; } if (isCompleted) { CompletePipe(); } if (pipeCompletionCallbacks != null) { ScheduleCallbacks(WriterScheduler, pipeCompletionCallbacks); } TrySchedule(WriterScheduler, in completionData); } internal void OnWriterCompleted(Action<Exception?, object?> callback, object? state) { if (callback == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callback); } PipeCompletionCallbacks pipeCompletionCallbacks; lock (SyncObj) { pipeCompletionCallbacks = _writerCompletion.AddCallback(callback, state); } if (pipeCompletionCallbacks != null) { ScheduleCallbacks(ReaderScheduler, pipeCompletionCallbacks); } } internal void CancelPendingRead() { CompletionData completionData; lock (SyncObj) { _readerAwaitable.Cancel(out completionData); } TrySchedule(ReaderScheduler, in completionData); } internal void CancelPendingFlush() { CompletionData completionData; lock (SyncObj) { _writerAwaitable.Cancel(out completionData); } TrySchedule(WriterScheduler, in completionData); } internal void OnReaderCompleted(Action<Exception?, object?> callback, object? state) { if (callback == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callback); } PipeCompletionCallbacks pipeCompletionCallbacks; lock (SyncObj) { pipeCompletionCallbacks = _readerCompletion.AddCallback(callback, state); } if (pipeCompletionCallbacks != null) { ScheduleCallbacks(WriterScheduler, pipeCompletionCallbacks); } } internal ValueTask<ReadResult> ReadAtLeastAsync(int minimumBytes, CancellationToken token) { if (_readerCompletion.IsCompleted) { ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed(); } if (token.IsCancellationRequested) { return new ValueTask<ReadResult>(Task.FromCanceled<ReadResult>(token)); } CompletionData completionData = default(CompletionData); ValueTask<ReadResult> result2; lock (SyncObj) { _readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this); if (_readerAwaitable.IsCompleted) { GetReadResult(out var result); if (_unconsumedBytes >= minimumBytes || result.IsCanceled || result.IsCompleted) { return new ValueTask<ReadResult>(result); } _readerAwaitable.SetUncompleted(); _operationState.EndRead(); _readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this); } if (!_writerAwaitable.IsCompleted) { _writerAwaitable.Complete(out completionData); } _minimumReadBytes = minimumBytes; result2 = new ValueTask<ReadResult>(_reader, 0); } TrySchedule(WriterScheduler, in completionData); return result2; } internal ValueTask<ReadResult> ReadAsync(CancellationToken token) { if (_readerCompletion.IsCompleted) { ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed(); } if (token.IsCancellationRequested) { return new ValueTask<ReadResult>(Task.FromCanceled<ReadResult>(token)); } lock (SyncObj) { _readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this); if (_readerAwaitable.IsCompleted) { GetReadResult(out var result); return new ValueTask<ReadResult>(result); } return new ValueTask<ReadResult>(_reader, 0); } } internal bool TryRead(out ReadResult result) { lock (SyncObj) { if (_readerCompletion.IsCompleted) { ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed(); } if (_unconsumedBytes > 0 || _readerAwaitable.IsCompleted) { GetReadResult(out result); return true; } if (_readerAwaitable.IsRunning) { ThrowHelper.ThrowInvalidOperationException_AlreadyReading(); } _operationState.BeginReadTentative(); result = default(ReadResult); return false; } } private static void ScheduleCallbacks(PipeScheduler scheduler, PipeCompletionCallbacks completionCallbacks) { scheduler.UnsafeSchedule(s_invokeCompletionCallbacks, completionCallbacks); } [MethodImpl(MethodImplOptions.AggressiveInlining)] private static void TrySchedule(PipeScheduler scheduler, in CompletionData completionData) { Action<object> completion = completionData.Completion; if (completion != null) { if (completionData.SynchronizationContext == null && completionData.ExecutionContext == null) { scheduler.UnsafeSchedule(completion, completionData.CompletionState); } else { ScheduleWithContext(scheduler, in completionData); } } } [MethodImpl(MethodImplOptions.NoInlining)] private static void ScheduleWithContext(PipeScheduler scheduler, in CompletionData completionData) { if (completionData.SynchronizationContext == null) { scheduler.UnsafeSchedule(s_scheduleWithExecutionContextCallback, completionData); } else if (completionData.ExecutionContext == null) { completionData.SynchronizationContext.Post(s_syncContextExecuteWithoutExecutionContextCallback, completionData); } else { completionData.SynchronizationContext.Post(s_syncContextExecutionContextCallback, completionData); } } private static void ExecuteWithoutExecutionContext(object state) { CompletionData completionData = (CompletionData)state; completionData.Completion(completionData.CompletionState); } private static void ExecuteWithExecutionContext(object state) { ExecutionContext.Run(((CompletionData)state).ExecutionContext, s_executionContextRawCallback, state); } private void CompletePipe() { lock (SyncObj) { if (!_disposed) { _disposed = true; BufferSegment bufferSegment = _readHead ?? _readTail; while (bufferSegment != null) { BufferSegment bufferSegment2 = bufferSegment; bufferSegment = bufferSegment.NextSegment; bufferSegment2.Reset(); } _writingHead = null; _writingHeadMemory = default(Memory<byte>); _readHead = null; _readTail = null; _lastExaminedIndex = -1L; } } } internal ValueTaskSourceStatus GetReadAsyncStatus() { if (_readerAwaitable.IsCompleted) { if (_writerCompletion.IsFaulted) { return ValueTaskSourceStatus.Faulted; } return ValueTaskSourceStatus.Succeeded; } return ValueTaskSourceStatus.Pending; } internal void OnReadAsyncCompleted(Action<object?> continuation, object? state, ValueTaskSourceOnCompletedFlags flags) { CompletionData completionData; bool doubleCompletion; lock (SyncObj) { _readerAwaitable.OnCompleted(continuation, state, flags, out completionData, out doubleCompletion); } if (doubleCompletion) { Writer.Complete(ThrowHelper.CreateInvalidOperationException_NoConcurrentOperation()); } TrySchedule(ReaderScheduler, in completionData); } internal ReadResult GetReadAsyncResult() { CancellationTokenRegistration cancellationTokenRegistration = default(CancellationTokenRegistration); CancellationToken cancellationToken = default(CancellationToken); ReadResult result; try { lock (SyncObj) { if (!_readerAwaitable.IsCompleted) { ThrowHelper.ThrowInvalidOperationException_GetResultNotCompleted(); } cancellationTokenRegistration = _readerAwaitable.ReleaseCancellationTokenRegistration(out cancellationToken); GetReadResult(out result); } } finally { cancellationTokenRegistration.Dispose(); } if (result.IsCanceled) { cancellationToken.ThrowIfCancellationRequested(); } return result; } private void GetReadResult(out ReadResult result) { bool isCompleted = _writerCompletion.IsCompletedOrThrow(); bool flag = _readerAwaitable.ObserveCancellation(); BufferSegment readHead = _readHead; if (readHead != null) { ReadOnlySequence<byte> buffer = new ReadOnlySequence<byte>(readHead, _readHeadIndex, _readTail, _readTailIndex); result = new ReadResult(buffer, flag, isCompleted); } else { result = new ReadResult(default(ReadOnlySequence<byte>), flag, isCompleted); } if (flag) { _operationState.BeginReadTentative(); } else { _operationState.BeginRead(); } _minimumReadBytes = 0; } internal ValueTaskSourceStatus GetFlushAsyncStatus() { if (_writerAwaitable.IsCompleted) { if (_readerCompletion.IsFaulted) { return ValueTaskSourceStatus.Faulted; } return ValueTaskSourceStatus.Succeeded; } return ValueTaskSourceStatus.Pending; } internal FlushResult GetFlushAsyncResult() { FlushResult result = default(FlushResult); CancellationToken cancellationToken = default(CancellationToken); CancellationTokenRegistration cancellationTokenRegistration = default(CancellationTokenRegistration); try { lock (SyncObj) { if (!_writerAwaitable.IsCompleted) { ThrowHelper.ThrowInvalidOperationException_GetResultNotCompleted(); } GetFlushResult(ref result); cancellationTokenRegistration = _writerAwaitable.ReleaseCancellationTokenRegistration(out cancellationToken); return result; } } finally { cancellationTokenRegistration.Dispose(); cancellationToken.ThrowIfCancellationRequested(); } } internal long GetUnflushedBytes() { return _unflushedBytes; } private void GetFlushResult(ref FlushResult result) { if (_writerAwaitable.ObserveCancellation()) { result._resultFlags |= ResultFlags.Canceled; } if (_readerCompletion.IsCompletedOrThrow()) { result._resultFlags |= ResultFlags.Completed; } } internal ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken) { if (_writerCompletion.IsCompleted) { ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); } if (_readerCompletion.IsCompletedOrThrow()) { return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, isCompleted: true)); } if (cancellationToken.IsCancellationRequested) { return new ValueTask<FlushResult>(Task.FromCanceled<FlushResult>(cancellationToken)); } CompletionData completionData; ValueTask<FlushResult> result; lock (SyncObj) { AllocateWriteHeadIfNeeded(0); if (source.Length <= _writingHeadMemory.Length) { source.CopyTo(_writingHeadMemory); AdvanceCore(source.Length); } else { WriteMultiSegment(source.Span); } PrepareFlushUnsynchronized(out completionData, out result, cancellationToken); } TrySchedule(ReaderScheduler, in completionData); return result; } private void WriteMultiSegment(ReadOnlySpan<byte> source) { Span<byte> span = _writingHeadMemory.Span; while (true) { int num = Math.Min(span.Length, source.Length); source.Slice(0, num).CopyTo(span); source = source.Slice(num); AdvanceCore(num); if (source.Length != 0) { _writingHead.End += _writingHeadBytesBuffered; _writingHeadBytesBuffered = 0; BufferSegment bufferSegment = AllocateSegment(0); _writingHead.SetNext(bufferSegment); _writingHead = bufferSegment; span = _writingHeadMemory.Span; continue; } break; } } internal void OnFlushAsyncCompleted(Action<object?> continuation, object? state, ValueTaskSourceOnCompletedFlags flags) { CompletionData completionData; bool doubleCompletion; lock (SyncObj) { _writerAwaitable.OnCompleted(continuation, state, flags, out completionData, out doubleCompletion); } if (doubleCompletion) { Reader.Complete(ThrowHelper.CreateInvalidOperationException_NoConcurrentOperation()); } TrySchedule(WriterScheduler, in completionData); } private void ReaderCancellationRequested() { CompletionData completionData; lock (SyncObj) { _readerAwaitable.CancellationTokenFired(out completionData); } TrySchedule(ReaderScheduler, in completionData); } private void WriterCancellationRequested() { CompletionData completionData; lock (SyncObj) { _writerAwaitable.CancellationTokenFired(out completionData); } TrySchedule(WriterScheduler, in completionData); } public void Reset() { lock (SyncObj) { if (!_disposed) { ThrowHelper.ThrowInvalidOperationException_ResetIncompleteReaderWriter(); } _disposed = false; ResetState(); } } } [DebuggerDisplay("CanceledState = {_awaitableState}, IsCompleted = {IsCompleted}")] internal struct PipeAwaitable { [Flags] private enum AwaitableState { None = 0, Completed = 1, Running = 2, Canceled = 4, UseSynchronizationContext = 8 } private sealed class SchedulingContext { public SynchronizationContext SynchronizationContext { get; set; } public ExecutionContext ExecutionContext { get; set; } } private AwaitableState _awaitableState; private Action<object> _completion; private object _completionState; private SchedulingContext _schedulingContext; private CancellationTokenRegistration _cancellationTokenRegistration; private CancellationToken _cancellationToken; private CancellationToken CancellationToken => _cancellationToken; public bool IsCompleted => (_awaitableState & (AwaitableState.Completed | AwaitableState.Canceled)) != 0; public bool IsRunning => (_awaitableState & AwaitableState.Running) != 0; public PipeAwaitable(bool completed, bool useSynchronizationContext) { _awaitableState = (completed ? AwaitableState.Completed : AwaitableState.None) | (useSynchronizationContext ? AwaitableState.UseSynchronizationContext : AwaitableState.None); _completion = null; _completionState = null; _cancellationTokenRegistration = default(CancellationTokenRegistration); _schedulingContext = null; _cancellationToken = CancellationToken.None; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void BeginOperation(CancellationToken cancellationToken, Action<object?> callback, object? state) { if (cancellationToken.CanBeCanceled && !IsCompleted) { _cancellationTokenRegistration = CancellationTokenExtensions.UnsafeRegister(cancellationToken, callback, state); if (_cancellationTokenRegistration == default(CancellationTokenRegistration)) { cancellationToken.ThrowIfCancellationRequested(); } _cancellationToken = cancellationToken; } _awaitableState |= AwaitableState.Running; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Complete(out CompletionData completionData) { ExtractCompletion(out completionData); _awaitableState |= AwaitableState.Completed; } [MethodImpl(MethodImplOptions.AggressiveInlining)] private void ExtractCompletion(out CompletionData completionData) { Action<object> completion = _completion; object completionState = _completionState; SchedulingContext schedulingContext = _schedulingContext; ExecutionContext executionContext = schedulingContext?.ExecutionContext; SynchronizationContext synchronizationContext = schedulingContext?.SynchronizationContext; _completion = null; _completionState = null; _schedulingContext = null; completionData = ((completion != null) ? new CompletionData(completion, completionState, executionContext, synchronizationContext) : default(CompletionData)); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void SetUncompleted() { _awaitableState &= ~AwaitableState.Completed; } public void OnCompleted(Action<object?> continuation, object? state, ValueTaskSourceOnCompletedFlags flags, out CompletionData completionData, out bool doubleCompletion) { completionData = default(CompletionData); doubleCompletion = _completion != null; if (IsCompleted | doubleCompletion) { completionData = new CompletionData(continuation, state, _schedulingContext?.ExecutionContext, _schedulingContext?.SynchronizationContext); return; } _completion = continuation; _completionState = state; if ((_awaitableState & AwaitableState.UseSynchronizationContext) != 0 && (flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0) { SynchronizationContext current = SynchronizationContext.Current; if (current != null && current.GetType() != typeof(SynchronizationContext)) { if (_schedulingContext == null) { _schedulingContext = new SchedulingContext(); } _schedulingContext.SynchronizationContext = current; } } if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0) { if (_schedulingContext == null) { _schedulingContext = new SchedulingContext(); } _schedulingContext.ExecutionContext = ExecutionContext.Capture(); } } public void Cancel(out CompletionData completionData) { ExtractCompletion(out completionData); _awaitableState |= AwaitableState.Canceled; } public void CancellationTokenFired(out CompletionData completionData) { if (CancellationToken.IsCancellationRequested) { Cancel(out completionData); } else { completionData = default(CompletionData); } } [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool ObserveCancellation() { bool result = (_awaitableState & AwaitableState.Canceled) == AwaitableState.Canceled; _awaitableState &= ~(AwaitableState.Running | AwaitableState.Canceled); return result; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public CancellationTokenRegistration ReleaseCancellationTokenRegistration(out CancellationToken cancellationToken) { cancellationToken = CancellationToken; CancellationTokenRegistration cancellationTokenRegistration = _cancellationTokenRegistration; _cancellationToken = default(CancellationToken); _cancellationTokenRegistration = default(CancellationTokenRegistration); return cancellationTokenRegistration; } } [DebuggerDisplay("IsCompleted = {IsCompleted}")] internal struct PipeCompletion { private static readonly object s_completedSuccessfully = new object(); private object _state; private List<PipeCompletionCallback> _callbacks; public bool IsCompleted => _state != null; public bool IsFaulted => _state is ExceptionDispatchInfo; public PipeCompletionCallbacks? TryComplete(Exception? exception = null) { if (_state == null) { if (exception != null) { _state = ExceptionDispatchInfo.Capture(exception); } else { _state = s_completedSuccessfully; } } return GetCallbacks(); } public PipeCompletionCallbacks? AddCallback(Action<Exception?, object?> callback, object? state) { if (_callbacks == null) { _callbacks = new List<PipeCompletionCallback>(); } _callbacks.Add(new PipeCompletionCallback(callback, state)); if (IsCompleted) { return GetCallbacks(); } return null; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool IsCompletedOrThrow() { if (!IsCompleted) { return false; } if (_state is ExceptionDispatchInfo exceptionDispatchInfo) { exceptionDispatchInfo.Throw(); } return true; } private PipeCompletionCallbacks GetCallbacks() { List<PipeCompletionCallback> callbacks = _callbacks; if (callbacks == null) { return null; } _callbacks = null; return new PipeCompletionCallbacks(callbacks, _state as ExceptionDispatchInfo); } public void Reset() { _state = null; } public override string ToString() { return string.Format("{0}: {1}", "IsCompleted", IsCompleted); } } internal readonly struct PipeCompletionCallback { public readonly Action<Exception?, object?> Callback; public readonly object? State; public PipeCompletionCallback(Action<Exception?, object?> callback, object? state) { Callback = callback; State = state; } } internal sealed class PipeCompletionCallbacks { private readonly List<PipeCompletionCallback> _callbacks; private readonly Exception _exception; public PipeCompletionCallbacks(List<PipeCompletionCallback> callbacks, ExceptionDispatchInfo? edi) { _callbacks = callbacks; _exception = edi?.SourceException; } public void Execute() { int count = _callbacks.Count; if (count != 0) { List<Exception> exceptions = null; for (int i = 0; i < count; i++) { PipeCompletionCallback callback = _callbacks[i]; Execute(callback, ref exceptions); } if (exceptions != null) { throw new AggregateException(exceptions); } } } private void Execute(PipeCompletionCallback callback, ref List<Exception> exceptions) { try { callback.Callback(_exception, callback.State); } catch (Exception item) { if (exceptions == null) { exceptions = new List<Exception>(); } exceptions.Add(item); } } } public class PipeOptions { private const int DefaultMinimumSegmentSize = 4096; public static PipeOptions Default { get; } = new PipeOptions(null, null, null, -1L, -1L); public bool UseSynchronizationContext { get; } public long PauseWriterThreshold { get; } public long ResumeWriterThreshold { get; } public int MinimumSegmentSize { get; } public PipeScheduler WriterScheduler { get; } public PipeScheduler ReaderScheduler { get; } public MemoryPool<byte> Pool { get; } internal bool IsDefaultSharedMemoryPool { get; } internal int InitialSegmentPoolSize { get; } internal int MaxSegmentPoolSize { get; } public PipeOptions(MemoryPool<byte>? pool = null, PipeScheduler? readerScheduler = null, PipeScheduler? writerScheduler = null, long pauseWriterThreshold = -1L, long resumeWriterThreshold = -1L, int minimumSegmentSize = -1, bool useSynchronizationContext = true) { MinimumSegmentSize = ((minimumSegmentSize == -1) ? 4096 : minimumSegmentSize); InitialSegmentPoolSize = 4; MaxSegmentPoolSize = 256; if (pauseWriterThreshold == -1) { pauseWriterThreshold = 65536L; } else if (pauseWriterThreshold < 0) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.pauseWriterThreshold); } switch (resumeWriterThreshold) { case -1L: resumeWriterThreshold = 32768L; break; case 0L: resumeWriterThreshold = 1L; break; } if (resumeWriterThreshold < 0 || (pauseWriterThreshold > 0 && resumeWriterThreshold > pauseWriterThreshold)) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.resumeWriterThreshold); } Pool = pool ?? MemoryPool<byte>.Shared; IsDefaultSharedMemoryPool = Pool == MemoryPool<byte>.Shared; ReaderScheduler = readerScheduler ?? PipeScheduler.ThreadPool; WriterScheduler = writerScheduler ?? PipeScheduler.ThreadPool; PauseWriterThreshold = pauseWriterThreshold; ResumeWriterThreshold = resumeWriterThreshold; UseSynchronizationContext = useSynchronizationContext; } } public abstract class PipeReader { private PipeReaderStream _stream; public abstract bool TryRead(out ReadResult result); public abstract ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default(CancellationToken)); public ValueTask<ReadResult> ReadAtLeastAsync(int minimumSize, CancellationToken cancellationToken = default(CancellationToken)) { if (minimumSize < 0) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.minimumSize); } return ReadAtLeastAsyncCore(minimumSize, cancellationToken); } protected virtual async ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumSize, CancellationToken cancellationToken) { ReadResult result; while (true) { result = await ReadAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false); ReadOnlySequence<byte> buffer = result.Buffer; if (buffer.Length >= minimumSize || result.IsCompleted || result.IsCanceled) { break; } AdvanceTo(buffer.Start, buffer.End); } return result; } public abstract void AdvanceTo(SequencePosition consumed); public abstract void AdvanceTo(SequencePosition consumed, SequencePosition examined); public virtual Stream AsStream(bool leaveOpen = false) { if (_stream == null) { _stream = new PipeReaderStream(this, leaveOpen); } else if (leaveOpen) { _stream.LeaveOpen = leaveOpen; } return _stream; } public abstract void CancelPendingRead(); public abstract void Complete(Exception? exception = null); public virtual ValueTask CompleteAsync(Exception? exception = null) { try { Complete(exception); return default(ValueTask); } catch (Exception exception2) { return new ValueTask(Task.FromException(exception2)); } } [Obsolete("OnWriterCompleted has been deprecated and may not be invoked on all implementations of PipeReader.")] public virtual void OnWriterCompleted(Action<Exception?, object?> callback, object? state) { } public static PipeReader Create(Stream stream, StreamPipeReaderOptions? readerOptions = null) { return new StreamPipeReader(stream, readerOptions ?? StreamPipeReaderOptions.s_default); } public static PipeReader Create(ReadOnlySequence<byte> sequence) { return new SequencePipeReader(sequence); } public virtual Task CopyToAsync(PipeWriter destination, CancellationToken cancellationToken = default(CancellationToken)) { if (destination == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.destination); } if (cancellationToken.IsCancellationRequested) { return Task.FromCanceled(cancellationToken); } return CopyToAsyncCore(destination, (PipeWriter destination, ReadOnlyMemory<byte> memory, CancellationToken cancellationToken) => destination.WriteAsync(memory, cancellationToken), cancellationToken); } public virtual Task CopyToAsync(Stream destination, CancellationToken cancellationToken = default(CancellationToken)) { if (destination == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.destination); } if (cancellationToken.IsCancellationRequested) { return Task.FromCanceled(cancellationToken); } return CopyToAsyncCore(destination, delegate(Stream destination, ReadOnlyMemory<byte> memory, CancellationToken cancellationToken) { ValueTask writeTask2 = StreamExtensions.WriteAsync(destination, memory, cancellationToken); if (writeTask2.IsCompletedSuccessfully) { writeTask2.GetAwaiter().GetResult(); return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, isCompleted: false)); } return Awaited(writeTask2); }, cancellationToken); static async ValueTask<FlushResult> Awaited(ValueTask writeTask) { await writeTask.ConfigureAwait(continueOnCapturedContext: false); return new FlushResult(isCanceled: false, isCompleted: false); } } private async Task CopyToAsyncCore<TStream>(TStream destination, Func<TStream, ReadOnlyMemory<byte>, CancellationToken, ValueTask<FlushResult>> writeAsync, CancellationToken cancellationToken) { while (true) { ReadResult result = await ReadAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false); ReadOnlySequence<byte> buffer = result.Buffer; SequencePosition position = buffer.Start; SequencePosition consumed = position; try { if (result.IsCanceled) { ThrowHelper.ThrowOperationCanceledException_ReadCanceled(); } ReadOnlyMemory<byte> memory; while (buffer.TryGet(ref position, out memory)) { if (memory.IsEmpty) { consumed = position; continue; } FlushResult flushResult = await writeAsync(destination, memory, cancellationToken).ConfigureAwait(continueOnCapturedContext: false); if (flushResult.IsCanceled) { ThrowHelper.ThrowOperationCanceledException_FlushCanceled(); } consumed = position; if (!flushResult.IsCompleted) { continue; } return; } consumed = buffer.End; if (result.IsCompleted) { break; } } finally { AdvanceTo(consumed); } } } } [DebuggerDisplay("State = {_state}")] internal struct PipeOperationState { [Flags] internal enum State : byte { Reading = 1, ReadingTentative = 2, Writing = 4 } private State _state; public bool IsWritingActive => (_state & State.Writing) == State.Writing; public bool IsReadingActive => (_state & State.Reading) == State.Reading; [MethodImpl(MethodImplOptions.AggressiveInlining)] public void BeginRead() { if ((_state & State.Reading) == State.Reading) { ThrowHelper.ThrowInvalidOperationException_AlreadyReading(); } _state |= State.Reading; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void BeginReadTentative() { if ((_state & State.Reading) == State.Reading) { ThrowHelper.ThrowInvalidOperationException_AlreadyReading(); } _state |= State.ReadingTentative; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void EndRead() { if ((_state & State.Reading) != State.Reading && (_state & State.ReadingTentative) != State.ReadingTentative) { ThrowHelper.ThrowInvalidOperationException_NoReadToComplete(); } _state &= ~(State.Reading | State.ReadingTentative); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void BeginWrite() { _state |= State.Writing; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void EndWrite() { _state &= ~State.Writing; } } internal sealed class PipeReaderStream : Stream { private readonly PipeReader _pipeReader; public override bool CanRead => true; public override bool CanSeek => false; public override bool CanWrite => false; public override long Length { get { throw new NotSupportedException(); } } public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } } internal bool LeaveOpen { get; set; } public PipeReaderStream(PipeReader pipeReader, bool leaveOpen) { _pipeReader = pipeReader; LeaveOpen = leaveOpen; } protected override void Dispose(bool disposing) { if (!LeaveOpen) { _pipeReader.Complete(); } base.Dispose(disposing); } public override void Flush() { } public override int Read(byte[] buffer, int offset, int count) { if (buffer == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.buffer); } return ReadInternal(new Span<byte>(buffer, offset, count)); } public override int ReadByte() { Span<byte> buffer = stackalloc byte[1]; if (ReadInternal(buffer) != 0) { return buffer[0]; } return -1; } private int ReadInternal(Span<byte> buffer) { ValueTask<ReadResult> valueTask = _pipeReader.ReadAsync(); ReadResult result = (valueTask.IsCompletedSuccessfully ? valueTask.Result : valueTask.AsTask().GetAwaiter().GetResult()); return HandleReadResult(result, buffer); } public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } public override void SetLength(long value) { throw new NotSupportedException(); } public override void Write(byte[] buffer, int offset, int count) { throw new NotSupportedException(); } public sealed override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) { return TaskToAsyncResult.Begin(ReadAsync(buffer, offset, count, default(CancellationToken)), callback, state); } public sealed override int EndRead(IAsyncResult asyncResult) { return TaskToAsyncResult.End<int>(asyncResult); } public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { if (buffer == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.buffer); } return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask(); } private async ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken) { return HandleReadResult(await _pipeReader.ReadAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false), buffer.Span); } private int HandleReadResult(ReadResult result, Span<byte> buffer) { if (result.IsCanceled) { ThrowHelper.ThrowOperationCanceledException_ReadCanceled(); } ReadOnlySequence<byte> buffer2 = result.Buffer; long length = buffer2.Length; SequencePosition consumed = buffer2.Start; try { if (length != 0L) { int num = (int)Math.Min(length, buffer.Length); ReadOnlySequence<byte> source = ((num == length) ? buffer2 : buffer2.Slice(0, num)); consumed = source.End; source.CopyTo(buffer); return num; } if (result.IsCompleted) { return 0; } } finally { _pipeReader.AdvanceTo(consumed); } ThrowHelper.ThrowInvalidOperationException_InvalidZeroByteRead(); return 0; } public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) { StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize); return _pipeReader.CopyToAsync(destination, cancellationToken); } } public abstract class PipeScheduler { private static readonly ThreadPoolScheduler s_threadPoolScheduler = new ThreadPoolScheduler(); private static readonly InlineScheduler s_inlineScheduler = new InlineScheduler(); public static PipeScheduler ThreadPool => s_threadPoolScheduler; public static PipeScheduler Inline => s_inlineScheduler; public abstract void Schedule(Action<object?> action, object? state); internal virtual void UnsafeSchedule(Action<object?> action, object? state) { Schedule(action, state); } } public abstract class PipeWriter : IBufferWriter<byte> { private PipeWriterStream _stream; public virtual bool CanGetUnflushedBytes => false; public virtual long UnflushedBytes { get { throw ThrowHelper.CreateNotSupportedException_UnflushedBytes(); } } public abstract void Complete(Exception? exception = null); public virtual ValueTask CompleteAsync(Exception? exception = null) { try { Complete(exception); return default(ValueTask); } catch (Exception exception2) { return new ValueTask(Task.FromException(exception2)); } } public abstract void CancelPendingFlush(); [Obsolete("OnReaderCompleted has been deprecated and may not be invoked on all implementations of PipeWriter.")] public virtual void OnReaderCompleted(Action<Exception?, object?> callback, object? state) { } public abstract ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default(CancellationToken)); public abstract void Advance(int bytes); public abstract Memory<byte> GetMemory(int sizeHint = 0); public abstract Span<byte> GetSpan(int sizeHint = 0); public virtual Stream AsStream(bool leaveOpen = false) { if (_stream == null) { _stream = new PipeWriterStream(this, leaveOpen); } else if (leaveOpen) { _stream.LeaveOpen = leaveOpen; } return _stream; } public static PipeWriter Create(Stream stream, StreamPipeWriterOptions? writerOptions = null) { return new StreamPipeWriter(stream, writerOptions ?? StreamPipeWriterOptions.s_default); } public virtual ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken)) { this.Write(source.Span); return FlushAsync(cancellationToken); } protected internal virtual async Task CopyFromAsync(Stream source, CancellationToken cancellationToken = default(CancellationToken)) { FlushResult flushResult; do { Memory<byte> memory = GetMemory(); int num = await StreamExtensions.ReadAsync(source, memory, cancellationToken).ConfigureAwait(continueOnCapturedContext: false); if (num != 0) { Advance(num); flushResult = await FlushAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false); if (flushResult.IsCanceled) { ThrowHelper.ThrowOperationCanceledException_FlushCanceled(); } continue; } break; } while (!flushResult.IsCompleted); } } internal sealed class PipeWriterStream : Stream { private readonly PipeWriter _pipeWriter; internal bool LeaveOpen { get; set; } public override bool CanRead => false; public override bool CanSeek => false; public override bool CanWrite => true; public override long Length { get { throw new NotSupportedException(); } } public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } } public PipeWriterStream(PipeWriter pipeWriter, bool leaveOpen) { _pipeWriter = pipeWriter; LeaveOpen = leaveOpen; } protected override void Dispose(bool disposing) { if (!LeaveOpen) { _pipeWriter.Complete(); } } public override void Flush() { FlushAsync().GetAwaiter().GetResult(); } public override int Read(byte[] buffer, int offset, int count) { throw new NotSupportedException(); } public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } public override void SetLength(long value) { throw new NotSupportedException(); } public sealed override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) { return TaskToAsyncResult.Begin(WriteAsync(buffer, offset, count, default(CancellationToken)), callback, state); } public sealed override void EndWrite(IAsyncResult asyncResult) { TaskToAsyncResult.End(asyncResult); } public override void Write(byte[] buffer, int offset, int count) { WriteAsync(buffer, offset, count).GetAwaiter().GetResult(); } public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { if (buffer == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.buffer); } return GetFlushResultAsTask(_pipeWriter.WriteAsync(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken)); } public override Task FlushAsync(CancellationToken cancellationToken) { return GetFlushResultAsTask(_pipeWriter.FlushAsync(cancellationToken)); } private static Task GetFlushResultAsTask(ValueTask<FlushResult> valueTask) { if (valueTask.IsCompletedSuccessfully) { if (valueTask.Result.IsCanceled) { ThrowHelper.ThrowOperationCanceledException_FlushCanceled(); } return Task.CompletedTask; } return AwaitTask(valueTask); static async Task AwaitTask(ValueTask<FlushResult> valueTask) { if ((await valueTask.ConfigureAwait(continueOnCapturedContext: false)).IsCanceled) { ThrowHelper.ThrowOperationCanceledException_FlushCanceled(); } } } } public readonly struct ReadResult { internal readonly ReadOnlySequence<byte> _resultBuffer; internal readonly ResultFlags _resultFlags; public ReadOnlySequence<byte> Buffer => _resultBuffer; public bool IsCanceled => (_resultFlags & ResultFlags.Canceled) != 0; public bool IsCompleted => (_resultFlags & ResultFlags.Completed) != 0; public ReadResult(ReadOnlySequence<byte> buffer, bool isCanceled, bool isCompleted) { _resultBuffer = buffer; _resultFlags = ResultFlags.None; if (isCompleted) { _resultFlags |= ResultFlags.Completed; } if (isCanceled) { _resultFlags |= ResultFlags.Canceled; } } } [Flags] internal enum ResultFlags : byte { None = 0, Canceled = 1, Completed = 2 } internal sealed class SequencePipeReader : PipeReader { private ReadOnlySequence<byte> _sequence; private bool _isReaderCompleted; private int _cancelNext; public SequencePipeReader(ReadOnlySequence<byte> sequence) { _sequence = sequence; } public override void AdvanceTo(SequencePosition consumed) { AdvanceTo(consumed, consumed); } public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) { ThrowIfCompleted(); if (consumed.Equals(_sequence.End)) { _sequence = ReadOnlySequence<byte>.Empty; } else { _sequence = _sequence.Slice(consumed); } } public override void CancelPendingRead() { Interlocked.Exchange(ref _cancelNext, 1); } public override void Complete(Exception? exception = null) { if (!_isReaderCompleted) { _isReaderCompleted = true; _sequence = ReadOnlySequence<byte>.Empty; } } public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default(CancellationToken)) { if (TryRead(out var result)) { return new ValueTask<ReadResult>(result); } result = new ReadResult(ReadOnlySequence<byte>.Empty, isCanceled: false, isCompleted: true); return new ValueTask<ReadResult>(result); } public override bool TryRead(out ReadResult result) { ThrowIfCompleted(); bool flag = Interlocked.Exchange(ref _cancelNext, 0) == 1; if (flag || _sequence.Length > 0) { result = new ReadResult(_sequence, flag, isCompleted: true); return true; } result = default(ReadResult); return false; } private void ThrowIfCompleted() { if (_isReaderCompleted) { ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed(); } } } public static class StreamPipeExtensions { public static Task CopyToAsync(this Stream source, PipeWriter destination, CancellationToken cancellationToken = default(CancellationToken)) { if (source == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.source); } if (destination == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.destination); } if (cancellationToken.IsCancellationRequested) { return Task.FromCanceled(cancellationToken); } return destination.CopyFromAsync(source, cancellationToken); } } internal sealed class StreamPipeReader : PipeReader { internal const int InitialSegmentPoolSize = 4; internal const int MaxSegmentPoolSize = 256; private CancellationTokenSource _internalTokenSource; private bool _isReaderCompleted; private bool _isStreamCompleted; private BufferSegment _readHead; private int _readIndex; private BufferSegment _readTail; private long _bufferedBytes; private bool _examinedEverything; private readonly object _lock = new object(); private BufferSegmentStack _bufferSegmentPool; private readonly StreamPipeReaderOptions _options; private bool LeaveOpen => _options.LeaveOpen; private bool UseZeroByteReads => _options.UseZeroByteReads; private int BufferSize => _options.BufferSize; private int MaxBufferSize => _options.MaxBufferSize; private int MinimumReadThreshold => _options.MinimumReadSize; private MemoryPool<byte> Pool => _options.Pool; public Stream InnerStream { get; } private CancellationTokenSource InternalTokenSource { get { lock (_lock) { return _internalTokenSource ?? (_internalTokenSource = new CancellationTokenSource()); } } } public StreamPipeReader(Stream readingStream, StreamPipeReaderOptions options) { if (readingStream == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.readingStream); } if (options == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.options); } InnerStream = readingStream; _options = options; _bufferSegmentPool = new BufferSegmentStack(4); } public override void AdvanceTo(SequencePosition consumed) { AdvanceTo(consumed, consumed); } public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) { ThrowIfCompleted(); AdvanceTo((BufferSegment)consumed.GetObject(), consumed.GetInteger(), (BufferSegment)examined.GetObject(), examined.GetInteger()); } private void AdvanceTo(BufferSegment consumedSegment, int consumedIndex, BufferSegment examinedSegment, int examinedIndex) { if (consumedSegment != null && examinedSegment != null) { if (_readHead == null) { ThrowHelper.ThrowInvalidOperationException_AdvanceToInvalidCursor(); } BufferSegment bufferSegment = _readHead; BufferSegment bufferSegment2 = consumedSegment; long length = BufferSegment.GetLength(bufferSegment, _readIndex, consumedSegment, consumedIndex); _bufferedBytes -= length; _examinedEverything = false; if (examinedSegment == _readTail) { _examinedEverything = examinedIndex == _readTail.End; } if (_bufferedBytes == 0L) { bufferSegment2 = null; _readHead = null; _readTail = null; _readIndex = 0; } else if (consumedIndex == bufferSegment2.Length) { BufferSegment bufferSegment3 = (_readHead = bufferSegment2.NextSegment); _readIndex = 0; bufferSegment2 = bufferSegment3; } else { _readHead = consumedSegment; _readIndex = consumedIndex; } while (bufferSegment != bufferSegment2) { BufferSegment? nextSegment = bufferSegment.NextSegment; ReturnSegmentUnsynchronized(bufferSegment); bufferSegment = nextSegment; } } } public override void CancelPendingRead() { InternalTokenSource.Cancel(); } public override void Complete(Exception? exception = null) { if (CompleteAndGetNeedsDispose()) { InnerStream.Dispose(); } } private bool CompleteAndGetNeedsDispose() { if (_isReaderCompleted) { return false; } _isReaderCompleted = true; BufferSegment bufferSegment = _readHead; while (bufferSegment != null) { BufferSegment bufferSegment2 = bufferSegment; bufferSegment = bufferSegment.NextSegment; bufferSegment2.Reset(); } return !LeaveOpen; } public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default(CancellationToken)) { return ReadInternalAsync(null, cancellationToken); } protected override ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumSize, CancellationToken cancellationToken) { return ReadInternalAsync(minimumSize, cancellationToken); } private ValueTask<ReadResult> ReadInternalAsync(int? minimumSize, CancellationToken cancellationToken) { ThrowIfCompleted(); if (cancellationToken.IsCancellationRequested) { return new ValueTask<ReadResult>(Task.FromCanceled<ReadResult>(cancellationToken)); } CancellationTokenSource internalTokenSource = InternalTokenSource; if (TryReadInternal(internalTokenSource, out var result) && (!minimumSize.HasValue || result.Buffer.Length >= minimumSize || result.IsCompleted || result.IsCanceled)) { return new ValueTask<ReadResult>(result); } if (_isStreamCompleted) { return new ValueTask<ReadResult>(new ReadResult(default(ReadOnlySequence<byte>), isCanceled: false, isCompleted: true)); } return Core(this, minimumSize, internalTokenSource, cancellationToken); static async ValueTask<ReadResult> Core(StreamPipeReader reader, int? minimumSize, CancellationTokenSource tokenSource, CancellationToken cancellationToken) { CancellationTokenRegistration cancellationTokenRegistration = default(CancellationTokenRegistration); if (cancellationToken.CanBeCanceled) { cancellationTokenRegistration = CancellationTokenExtensions.UnsafeRegister(cancellationToken, delegate(object state) { ((StreamPipeReader)state).Cancel(); }, reader); } using (cancellationTokenRegistration) { bool isCanceled = false; try { if (reader.UseZeroByteReads && reader._bufferedBytes == 0L) { await StreamExtensions.ReadAsync(reader.InnerStream, Memory<byte>.Empty, tokenSource.Token).ConfigureAwait(continueOnCapturedContext: false); } do { reader.AllocateReadTail(minimumSize); Memory<byte> buffer = reader._readTail.AvailableMemory.Slice(reader._readTail.End); int num = await StreamExtensions.ReadAsync(reader.InnerStream, buffer, tokenSource.Token).ConfigureAwait(continueOnCapturedContext: false); reader._readTail.End += num; reader._bufferedBytes += num; if (num == 0) { reader._isStreamCompleted = true; break; } } while (minimumSize.HasValue && reader._bufferedBytes < minimumSize); } catch (OperationCanceledException ex) { reader.ClearCancellationToken(); if (cancellationToken.IsCancellationRequested) { throw new OperationCanceledException(ex.Message, ex, cancellationToken); } if (!tokenSource.IsCancellationRequested) { throw; } isCanceled = true; } return new ReadResult(reader.GetCurrentReadOnlySequence(), isCanceled, reader._isStreamCompleted); } } } public override async Task CopyToAsync(PipeWriter destination, CancellationToken cancellationToken = default(CancellationToken)) { ThrowIfCompleted(); CancellationTokenSource tokenSource = InternalTokenSource; if (tokenSource.IsCancellationRequested) { ThrowHelper.ThrowOperationCanceledException_ReadCanceled(); } CancellationTokenRegistration cancellationTokenRegistration = default(CancellationTokenRegistration); if (cancellationToken.CanBeCanceled) { cancellationTokenRegistration = CancellationTokenExtensions.UnsafeRegister(cancellationToken, delegate(object state) { ((StreamPipeReader)state).Cancel(); }, this); } using (cancellationTokenRegistration) { _ = 1; try { BufferSegment segment = _readHead; int start = _readIndex; try { while (segment != null) { FlushResult flushResult = await destination.WriteAsync(segment.Memory.Slice(start), tokenSource.Token).ConfigureAwait(continueOnCapturedContext: false); if (flushResult.IsCanceled) { ThrowHelper.ThrowOperationCanceledException_FlushCanceled(); } segment = segment.NextSegment; start = 0; if (flushResult.IsCompleted) { return; } } } finally { if (segment != null) { AdvanceTo(segment, segment.End, segment, segment.End); } } if (_isStreamCompleted) { return; } await InnerStream.CopyToAsync(destination, tokenSource.Token).ConfigureAwait(continueOnCapturedContext: false); } catch (OperationCanceledException) { ClearCancellationToken(); throw; } } } public override async Task CopyToAsync(Stream destination, CancellationToken cancellationToken = default(CancellationToken)) { ThrowIfCompleted(); CancellationTokenSource tokenSource = InternalTokenSource; if (tokenSource.IsCancellationRequested) { ThrowHelper.ThrowOperationCanceledException_ReadCanceled(); } CancellationTokenRegistration cancellationTokenRegistration = default(CancellationTokenRegistration); if (cancellationToken.CanBeCanceled) { cancellationTokenRegistration = CancellationTokenExtensions.UnsafeRegister(cancellationToken, delegate(object state) { ((StreamPipeReader)state).Cancel(); }, this); } using (cancellationTokenRegistration) { _ = 1; try { BufferSegment segment = _readHead; int start = _readIndex; try { while (segment != null) { await StreamExtensions.WriteAsync(destination, segment.Memory.Slice(start), tokenSource.Token).ConfigureAwait(continueOnCapturedContext: false); segment = segment.NextSegment; start = 0; } } finally { if (segment != null) { AdvanceTo(segment, segment.End, segment, segment.End); } } if (_isStreamCompleted) { return; } await StreamExtensions.CopyToAsync(InnerStream, destination, tokenSource.Token).ConfigureAwait(continueOnCapturedContext: false); } catch (OperationCanceledException) { ClearCancellationToken(); throw; } } } private void ClearCancellationToken() { lock (_lock) { _internalTokenSource = null; } } private void ThrowIfCompleted() { if (_isReaderCompleted) { ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed(); } } public override bool TryRead(out ReadResult result) { ThrowIfCompleted(); return TryReadInternal(InternalTokenSource, out result); } private bool TryReadInternal(CancellationTokenSource source, out ReadResult result) { bool isCancellationRequested = source.IsCancellationRequested; if (isCancellationRequested || (_bufferedBytes > 0 && (!_examinedEverything || _isStreamCompleted))) { if (isCancellationRequested) { ClearCancellationToken(); } ReadOnlySequence<byte> currentReadOnlySequence = GetCurrentReadOnlySequence(); result = new ReadResult(currentReadOnlySequence, isCancellationRequested, _isStreamCompleted); return true; } result = default(ReadResult); return false; } private ReadOnlySequence<byte> GetCurrentReadOnlySequence() { if (_readHead != null) { return new ReadOnlySequence<byte>(_readHead, _readIndex, _readTail, _readTail.End); } return default(ReadOnlySequence<byte>); } private void AllocateReadTail(int? minimumSize = null) { if (_readHead == null) { _readHead = AllocateSegment(minimumSize); _readTail = _readHead; } else if (_readTail.WritableBytes < MinimumReadThreshold) { BufferSegment bufferSegment = AllocateSegment(minimumSize); _readTail.SetNext(bufferSegment); _readTail = bufferSegment; } } private BufferSegment AllocateSegment(int? minimumSize = null) { BufferSegment bufferSegment = CreateSegmentUnsynchronized(); int num = minimumSize ?? BufferSize; int num2 = ((!_options.IsDefaultSharedMemoryPool) ? _options.Pool.MaxBufferSize : (-1)); if (num <= num2) { int segmentSize = GetSegmentSize(num, num2); bufferSegment.SetOwnedMemory(_options.Pool.Rent(segmentSize)); } else { int segmentSize2 = GetSegmentSize(num, MaxBufferSize); bufferSegment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(segmentSize2)); } return bufferSegment; } private int GetSegmentSize(int sizeHint, int maxBufferSize) { sizeHint = Math.Max(BufferSize, sizeHint); return Math.Min(maxBufferSize, sizeHint); } private BufferSegment CreateSegmentUnsynchronized() { if (_bufferSegmentPool.TryPop(out BufferSegment result)) { return result; } return new BufferSegment(); } private void ReturnSegmentUnsynchronized(BufferSegment segment) { segment.Reset(); if (_bufferSegmentPool.Count < 256) { _bufferSegmentPool.Push(segment); } } private void Cancel() { InternalTokenSource.Cancel(); } } public class StreamPipeReaderOptions { private const int DefaultBufferSize = 4096; internal const int DefaultMaxBufferSize = 2097152; private const int DefaultMinimumReadSize = 1024; internal static readonly StreamPipeReaderOptions s_default = new StreamPipeReaderOptions(); public int BufferSize { get; } internal int MaxBufferSize { get; } = 2097152; public int MinimumReadSize { get; } public MemoryPool<byte> Pool { get; } public bool LeaveOpen { get; } public bool UseZeroByteReads { get; } internal bool IsDefaultSharedMemoryPool { get; } public StreamPipeReaderOptions(MemoryPool<byte>? pool, int bufferSize, int minimumReadSize, bool leaveOpen) : this(pool, bufferSize, minimumReadSize, leaveOpen, useZeroByteReads: false) { } public StreamPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false, bool useZeroByteReads = false) { Pool = pool ?? MemoryPool<byte>.Shared; IsDefaultSharedMemoryPool = Pool == MemoryPool<byte>.Shared; int num; if (bufferSize != -1) { if (bufferSize <= 0) { throw new ArgumentOutOfRangeException("bufferSize"); } num = bufferSize; } else { num = 4096; } BufferSize = num; int num2; if (minimumReadSize != -1) { if (minimumReadSize <= 0) { throw new ArgumentOutOfRangeException("minimumReadSize"); } num2 = minimumReadSize; } else { num2 = 1024; } MinimumReadSize = num2; LeaveOpen = leaveOpen; UseZeroByteReads = useZeroByteReads; } } internal sealed class StreamPipeWriter : PipeWriter { internal const int InitialSegmentPoolSize = 4; internal const int MaxSegmentPoolSize = 256; private readonly int _minimumBufferSize; private BufferSegment _head; private BufferSegment _tail; private Memory<byte> _tailMemory; private int _tailBytesBuffered; private int _bytesBuffered; private readonly MemoryPool<byte> _pool; private readonly int _maxPooledBufferSize; private CancellationTokenSource _internalTokenSource; private bool _isCompleted; private readonly object _lockObject = new object(); private BufferSegmentStack _bufferSegmentPool; private readonly bool _leaveOpen; private CancellationTokenSource InternalTokenSource { get { lock (_lockObject) { return _internalTokenSource ?? (_internalTokenSource = new CancellationTokenSource()); } } } public Stream InnerStream { get; } public override bool CanGetUnflushedBytes => true; public override long UnflushedBytes => _bytesBuffered; public StreamPipeWriter(Stream writingStream, StreamPipeWriterOptions options) { if (writingStream == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.writingStream); } if (options == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.options); } InnerStream = writingStream; _minimumBufferSize = options.MinimumBufferSize; _pool = ((options.Pool == MemoryPool<byte>.Shared) ? null : options.Pool); _maxPooledBufferSize = _pool?.MaxBufferSize ?? (-1); _bufferSegmentPool = new BufferSegmentStack(4); _leaveOpen = options.LeaveOpen; } public override void Advance(int bytes) { if ((uint)bytes > (uint)_tailMemory.Length) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes); } _tailBytesBuffered += bytes; _bytesBuffered += bytes; _tailMemory = _tailMemory.Slice(bytes); } public override Memory<byte> GetMemory(int sizeHint = 0) { if (_isCompleted) { ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); } if (sizeHint < 0) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.sizeHint); } AllocateMemory(sizeHint); return _tailMemory; } public override Span<byte> GetSpan(int sizeHint = 0) { if (_isCompleted) { ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); } if (sizeHint < 0) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.sizeHint); } AllocateMemory(sizeHint); return _tailMemory.Span; } private void AllocateMemory(int sizeHint) { if (_head == null) { BufferSegment tail = AllocateSegment(sizeHint); _head = (_tail = tail); _tailBytesBuffered = 0; return; } int length = _tailMemory.Length; if (length == 0 || length < sizeHint) { if (_tailBytesBuffered > 0) { _tail.End += _tailBytesBuffered; _tailBytesBuffered = 0; } BufferSegment bufferSegment = AllocateSegment(sizeHint); _tail.SetNext(bufferSegment); _tail = bufferSegment; } } private BufferSegment AllocateSegment(int sizeHint) { BufferSegment bufferSegment = CreateSegmentUnsynchronized(); int maxPooledBufferSize = _maxPooledBufferSize; if (sizeHint <= maxPooledBufferSize) { bufferSegment.SetOwnedMemory(_pool.Rent(GetSegmentSize(sizeHint, maxPooledBufferSize))); } else { int segmentSize = GetSegmentSize(sizeHint); bufferSegment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(segmentSize)); } _tailMemory = bufferSegment.AvailableMemory; return bufferSegment; } private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue) { sizeHint = Math.Max(_minimumBufferSize, sizeHint); return Math.Min(maxBufferSize, sizeHint); } private BufferSegment CreateSegmentUnsynchronized() { if (_bufferSegmentPool.TryPop(out BufferSegment result)) { return result; } return new BufferSegment(); } private void ReturnSegmentUnsynchronized(BufferSegment segment) { segment.Reset(); if (_bufferSegmentPool.Count < 256) { _bufferSegmentPool.Push(segment); } } public override void CancelPendingFlush() { Cancel(); } public override void Complete(Exception? exception = null) { if (_isCompleted) { return; } _isCompleted = true; try { FlushInternal(exception == null); } finally { _internalTokenSource?.Dispose(); if (!_leaveOpen) { InnerStream.Dispose(); } } } public override async ValueTask CompleteAsync(Exception? exception = null) { if (_isCompleted) { return; } _isCompleted = true; try { await FlushAsyncInternal(exception == null, Memory<byte>.Empty).ConfigureAwait(continueOnCapturedContext: false); } finally { _internalTokenSource?.Dispose(); if (!_leaveOpen) { InnerStream.Dispose(); } } } public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default(CancellationToken)) { if (_bytesBuffered == 0) { return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, isCompleted: false)); } return FlushAsyncInternal(writeToStream: true, Memory<byte>.Empty, cancellationToken); } public override ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken)) { return FlushAsyncInternal(writeToStream: true, source, cancellationToken); } private void Cancel() { InternalTokenSource.Cancel(); } private async ValueTask<FlushResult> FlushAsyncInternal(bool writeToStream, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default(CancellationToken)) { CancellationTokenRegistration cancellationTokenRegistration = default(CancellationTokenRegistration); if (cancellationToken.CanBeCanceled) { cancellationTokenRegistration = CancellationTokenExtensions.UnsafeRegister(cancellationToken, delegate(object state) { ((StreamPipeWriter)state).Cancel(); }, this); } if (_tailBytesBuffered > 0) { _tail.End += _tailBytesBuffered; _tailBytesBuffered = 0; } using (cancellationTokenRegistration) { CancellationToken localToken = InternalTokenSource.Token; try { BufferSegment segment = _head; while (segment != null) { BufferSegment returnSegment = segment; segment = segment.NextSegment; if (returnSegment.Length > 0 && writeToStream) { await StreamExtensions.WriteAsync(InnerStream, returnSegment.Memory, localToken).ConfigureAwait(continueOnCapturedContext: false); } ReturnSegmentUnsynchronized(returnSegment); _head = segment; } if (writeToStream) { if (data.Length > 0) { await StreamExtensions.WriteAsync(InnerStream, data, localToken).ConfigureAwait(continueOnCapturedContext: false); } if (_bytesBuffered > 0 || data.Length > 0) { await InnerStream.FlushAsync(localToken).ConfigureAwait(continueOnCapturedContext: false); } } _head = null; _tail = null; _tailMemory