Please disclose if any significant portion of your mod was created using AI tools by adding the 'AI Generated' category. Failing to do so may result in the mod being removed from Thunderstore.
Decompiled source of System IO Pipelines v10.0.700
BepInEx/core/System.IO.Pipelines/netstandard2.0/System.IO.Pipelines.dll
Decompiled a week ago
The result has been truncated due to the large size, download it to view full contents!
using System; using System.Buffers; using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Reflection; using System.Resources; using System.Runtime.CompilerServices; using System.Runtime.ExceptionServices; using System.Runtime.InteropServices; using System.Runtime.Versioning; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Sources; using FxResources.System.IO.Pipelines; using Microsoft.CodeAnalysis; [assembly: CompilationRelaxations(8)] [assembly: RuntimeCompatibility(WrapNonExceptionThrows = true)] [assembly: Debuggable(DebuggableAttribute.DebuggingModes.IgnoreSymbolStoreSequencePoints)] [assembly: InternalsVisibleTo("System.IO.Pipelines.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001004b86c4cb78549b34bab61a3b1800e23bfeb5b3ec390074041536a7e3cbd97f5f04cf0f857155a8928eaa29ebfd11cfbbad3ba70efea7bda3226c6a8d370a4cd303f714486b6ebc225985a638471e6ef571cc92a4613c00b8fa65d61ccee0cbe5f36330c9a01f4183559f1bef24cc2917c6d913e3a541333a1d05d9bed22b38cb")] [assembly: TargetFramework(".NETStandard,Version=v2.0", FrameworkDisplayName = ".NET Standard 2.0")] [assembly: AssemblyMetadata("Serviceable", "True")] [assembly: AssemblyMetadata("PreferInbox", "True")] [assembly: AssemblyDefaultAlias("System.IO.Pipelines")] [assembly: NeutralResourcesLanguage("en-US")] [assembly: CLSCompliant(true)] [assembly: AssemblyMetadata("IsTrimmable", "True")] [assembly: AssemblyMetadata("IsAotCompatible", "True")] [assembly: DefaultDllImportSearchPaths(DllImportSearchPath.System32 | DllImportSearchPath.AssemblyDirectory)] [assembly: AssemblyCompany("Microsoft Corporation")] [assembly: AssemblyCopyright("© Microsoft Corporation. All rights reserved.")] [assembly: AssemblyDescription("Single producer single consumer byte buffer management.\r\n\r\nCommonly Used Types:\r\nSystem.IO.Pipelines.Pipe\r\nSystem.IO.Pipelines.PipeWriter\r\nSystem.IO.Pipelines.PipeReader")] [assembly: AssemblyFileVersion("10.0.726.21808")] [assembly: AssemblyInformationalVersion("10.0.7+b16286c2284fecf303dbc12a0bb152476d662e44")] [assembly: AssemblyProduct("Microsoft® .NET")] [assembly: AssemblyTitle("System.IO.Pipelines")] [assembly: AssemblyMetadata("RepositoryUrl", "https://github.com/dotnet/dotnet")] [assembly: AssemblyVersion("10.0.0.0")] [module: RefSafetyRules(11)] [module: System.Runtime.CompilerServices.NullablePublicOnly(true)] namespace Microsoft.CodeAnalysis { [CompilerGenerated] [Microsoft.CodeAnalysis.Embedded] internal sealed class EmbeddedAttribute : Attribute { } } namespace System.Runtime.CompilerServices { [CompilerGenerated] [Microsoft.CodeAnalysis.Embedded] internal sealed class IsReadOnlyAttribute : Attribute { } [CompilerGenerated] [Microsoft.CodeAnalysis.Embedded] [AttributeUsage(AttributeTargets.Class | AttributeTargets.Property | AttributeTargets.Field | AttributeTargets.Event | AttributeTargets.Parameter | AttributeTargets.ReturnValue | AttributeTargets.GenericParameter, AllowMultiple = false, Inherited = false)] internal sealed class NullableAttribute : Attribute { public readonly byte[] NullableFlags; public NullableAttribute(byte P_0) { NullableFlags = new byte[1] { P_0 }; } public NullableAttribute(byte[] P_0) { NullableFlags = P_0; } } [CompilerGenerated] [Microsoft.CodeAnalysis.Embedded] [AttributeUsage(AttributeTargets.Class | AttributeTargets.Struct | AttributeTargets.Method | AttributeTargets.Interface | AttributeTargets.Delegate, AllowMultiple = false, Inherited = false)] internal sealed class NullableContextAttribute : Attribute { public readonly byte Flag; public NullableContextAttribute(byte P_0) { Flag = P_0; } } [CompilerGenerated] [Microsoft.CodeAnalysis.Embedded] [AttributeUsage(AttributeTargets.Module, AllowMultiple = false, Inherited = false)] internal sealed class NullablePublicOnlyAttribute : Attribute { public readonly bool IncludesInternals; public NullablePublicOnlyAttribute(bool P_0) { IncludesInternals = P_0; } } [CompilerGenerated] [Microsoft.CodeAnalysis.Embedded] [AttributeUsage(AttributeTargets.Module, AllowMultiple = false, Inherited = false)] internal sealed class RefSafetyRulesAttribute : Attribute { public readonly int Version; public RefSafetyRulesAttribute(int P_0) { Version = P_0; } } [CompilerGenerated] [Microsoft.CodeAnalysis.Embedded] [AttributeUsage(AttributeTargets.Class | AttributeTargets.Struct | AttributeTargets.Enum | AttributeTargets.Method | AttributeTargets.Property | AttributeTargets.Field | AttributeTargets.Event | AttributeTargets.Interface | AttributeTargets.Delegate, AllowMultiple = false, Inherited = false)] internal sealed class ExtensionMarkerAttribute : Attribute { public ExtensionMarkerAttribute(string name) { } } } namespace FxResources.System.IO.Pipelines { internal static class SR { } } namespace System { internal static class SR { private static readonly bool s_usingResourceKeys = GetUsingResourceKeysSwitchValue(); private static ResourceManager s_resourceManager; internal static ResourceManager ResourceManager => s_resourceManager ?? (s_resourceManager = new ResourceManager(typeof(SR))); internal static string AdvanceToInvalidCursor => GetResourceString("AdvanceToInvalidCursor"); internal static string ArgumentOutOfRange_NeedPosNum => GetResourceString("ArgumentOutOfRange_NeedPosNum"); internal static string ConcurrentOperationsNotSupported => GetResourceString("ConcurrentOperationsNotSupported"); internal static string FlushCanceledOnPipeWriter => GetResourceString("FlushCanceledOnPipeWriter"); internal static string GetResultBeforeCompleted => GetResourceString("GetResultBeforeCompleted"); internal static string InvalidExaminedOrConsumedPosition => GetResourceString("InvalidExaminedOrConsumedPosition"); internal static string InvalidExaminedPosition => GetResourceString("InvalidExaminedPosition"); internal static string InvalidZeroByteRead => GetResourceString("InvalidZeroByteRead"); internal static string ObjectDisposed_StreamClosed => GetResourceString("ObjectDisposed_StreamClosed"); internal static string NoReadingOperationToComplete => GetResourceString("NoReadingOperationToComplete"); internal static string NotSupported_UnreadableStream => GetResourceString("NotSupported_UnreadableStream"); internal static string NotSupported_UnwritableStream => GetResourceString("NotSupported_UnwritableStream"); internal static string ReadCanceledOnPipeReader => GetResourceString("ReadCanceledOnPipeReader"); internal static string ReaderAndWriterHasToBeCompleted => GetResourceString("ReaderAndWriterHasToBeCompleted"); internal static string ReadingAfterCompleted => GetResourceString("ReadingAfterCompleted"); internal static string ReadingIsInProgress => GetResourceString("ReadingIsInProgress"); internal static string WritingAfterCompleted => GetResourceString("WritingAfterCompleted"); internal static string UnflushedBytesNotSupported => GetResourceString("UnflushedBytesNotSupported"); private static bool GetUsingResourceKeysSwitchValue() { if (!AppContext.TryGetSwitch("System.Resources.UseSystemResourceKeys", out var isEnabled)) { return false; } return isEnabled; } internal static bool UsingResourceKeys() { return s_usingResourceKeys; } private static string GetResourceString(string resourceKey) { if (UsingResourceKeys()) { return resourceKey; } string result = null; try { result = ResourceManager.GetString(resourceKey); } catch (MissingManifestResourceException) { } return result; } private static string GetResourceString(string resourceKey, string defaultString) { string resourceString = GetResourceString(resourceKey); if (!(resourceKey == resourceString) && resourceString != null) { return resourceString; } return defaultString; } internal static string Format(string resourceFormat, object? p1) { if (UsingResourceKeys()) { return string.Join(", ", resourceFormat, p1); } return string.Format(resourceFormat, p1); } internal static string Format(string resourceFormat, object? p1, object? p2) { if (UsingResourceKeys()) { return string.Join(", ", resourceFormat, p1, p2); } return string.Format(resourceFormat, p1, p2); } internal static string Format(string resourceFormat, object? p1, object? p2, object? p3) { if (UsingResourceKeys()) { return string.Join(", ", resourceFormat, p1, p2, p3); } return string.Format(resourceFormat, p1, p2, p3); } internal static string Format(string resourceFormat, params object?[]? args) { if (args != null) { if (UsingResourceKeys()) { return resourceFormat + ", " + string.Join(", ", args); } return string.Format(resourceFormat, args); } return resourceFormat; } internal static string Format(IFormatProvider? provider, string resourceFormat, object? p1) { if (UsingResourceKeys()) { return string.Join(", ", resourceFormat, p1); } return string.Format(provider, resourceFormat, p1); } internal static string Format(IFormatProvider? provider, string resourceFormat, object? p1, object? p2) { if (UsingResourceKeys()) { return string.Join(", ", resourceFormat, p1, p2); } return string.Format(provider, resourceFormat, p1, p2); } internal static string Format(IFormatProvider? provider, string resourceFormat, object? p1, object? p2, object? p3) { if (UsingResourceKeys()) { return string.Join(", ", resourceFormat, p1, p2, p3); } return string.Format(provider, resourceFormat, p1, p2, p3); } internal static string Format(IFormatProvider? provider, string resourceFormat, params object?[]? args) { if (args != null) { if (UsingResourceKeys()) { return resourceFormat + ", " + string.Join(", ", args); } return string.Format(provider, resourceFormat, args); } return resourceFormat; } } internal static class ExceptionPolyfills { [SpecialName] public sealed class <G>$E6188BA5B951F1F7AA9135E0EBB76F2B { [SpecialName] public static class <M>$96F0261AC622664B8B003966835C0332 { } [ExtensionMarker("<M>$96F0261AC622664B8B003966835C0332")] public static void ThrowIfNull([NotNull] object? argument, [CallerArgumentExpression("argument")] string? paramName = null) { throw null; } } [SpecialName] public sealed class <G>$3F30F31B33543D5FB8E174FB4FD780B9 { [SpecialName] public static class <M>$1F10CFA08738E6D8AF61CBECC6763DBC { } [ExtensionMarker("<M>$1F10CFA08738E6D8AF61CBECC6763DBC")] public static void ThrowIf([DoesNotReturnIf(true)] bool condition, object instance) { throw null; } [ExtensionMarker("<M>$1F10CFA08738E6D8AF61CBECC6763DBC")] public static void ThrowIf([DoesNotReturnIf(true)] bool condition, Type type) { throw null; } } public static void ThrowIfNull([NotNull] object? argument, [CallerArgumentExpression("argument")] string? paramName = null) { if (argument == null) { ThrowArgumentNullException(paramName); } } [DoesNotReturn] private static void ThrowArgumentNullException(string paramName) { throw new ArgumentNullException(paramName); } public static void ThrowIf([DoesNotReturnIf(true)] bool condition, object instance) { if (condition) { ThrowObjectDisposedException(instance); } } public static void ThrowIf([DoesNotReturnIf(true)] bool condition, Type type) { if (condition) { ThrowObjectDisposedException(type); } } [DoesNotReturn] private static void ThrowObjectDisposedException(object instance) { throw new ObjectDisposedException(instance?.GetType().FullName); } [DoesNotReturn] private static void ThrowObjectDisposedException(Type type) { throw new ObjectDisposedException(type?.FullName); } } } namespace System.Diagnostics.CodeAnalysis { [AttributeUsage(AttributeTargets.Property | AttributeTargets.Field | AttributeTargets.Parameter, Inherited = false)] internal sealed class AllowNullAttribute : Attribute { } [AttributeUsage(AttributeTargets.Property | AttributeTargets.Field | AttributeTargets.Parameter, Inherited = false)] internal sealed class DisallowNullAttribute : Attribute { } [AttributeUsage(AttributeTargets.Property | AttributeTargets.Field | AttributeTargets.Parameter | AttributeTargets.ReturnValue, Inherited = false)] internal sealed class MaybeNullAttribute : Attribute { } [AttributeUsage(AttributeTargets.Property | AttributeTargets.Field | AttributeTargets.Parameter | AttributeTargets.ReturnValue, Inherited = false)] internal sealed class NotNullAttribute : Attribute { } [AttributeUsage(AttributeTargets.Parameter, Inherited = false)] internal sealed class MaybeNullWhenAttribute : Attribute { public bool ReturnValue { get; } public MaybeNullWhenAttribute(bool returnValue) { ReturnValue = returnValue; } } [AttributeUsage(AttributeTargets.Parameter, Inherited = false)] internal sealed class NotNullWhenAttribute : Attribute { public bool ReturnValue { get; } public NotNullWhenAttribute(bool returnValue) { ReturnValue = returnValue; } } [AttributeUsage(AttributeTargets.Property | AttributeTargets.Parameter | AttributeTargets.ReturnValue, AllowMultiple = true, Inherited = false)] internal sealed class NotNullIfNotNullAttribute : Attribute { public string ParameterName { get; } public NotNullIfNotNullAttribute(string parameterName) { ParameterName = parameterName; } } [AttributeUsage(AttributeTargets.Method, Inherited = false)] internal sealed class DoesNotReturnAttribute : Attribute { } [AttributeUsage(AttributeTargets.Parameter, Inherited = false)] internal sealed class DoesNotReturnIfAttribute : Attribute { public bool ParameterValue { get; } public DoesNotReturnIfAttribute(bool parameterValue) { ParameterValue = parameterValue; } } [AttributeUsage(AttributeTargets.Method | AttributeTargets.Property, Inherited = false, AllowMultiple = true)] internal sealed class MemberNotNullAttribute : Attribute { public string[] Members { get; } public MemberNotNullAttribute(string member) { Members = new string[1] { member }; } public MemberNotNullAttribute(params string[] members) { Members = members; } } [AttributeUsage(AttributeTargets.Method | AttributeTargets.Property, Inherited = false, AllowMultiple = true)] internal sealed class MemberNotNullWhenAttribute : Attribute { public bool ReturnValue { get; } public string[] Members { get; } public MemberNotNullWhenAttribute(bool returnValue, string member) { ReturnValue = returnValue; Members = new string[1] { member }; } public MemberNotNullWhenAttribute(bool returnValue, params string[] members) { ReturnValue = returnValue; Members = members; } } } namespace System.Runtime.CompilerServices { [AttributeUsage(AttributeTargets.Parameter, AllowMultiple = false, Inherited = false)] internal sealed class CallerArgumentExpressionAttribute : Attribute { public string ParameterName { get; } public CallerArgumentExpressionAttribute(string parameterName) { ParameterName = parameterName; } } } namespace System.Runtime.InteropServices { [AttributeUsage(AttributeTargets.Method, AllowMultiple = false, Inherited = false)] internal sealed class LibraryImportAttribute : Attribute { public string LibraryName { get; } public string? EntryPoint { get; set; } public StringMarshalling StringMarshalling { get; set; } public Type? StringMarshallingCustomType { get; set; } public bool SetLastError { get; set; } public LibraryImportAttribute(string libraryName) { LibraryName = libraryName; } } internal enum StringMarshalling { Custom, Utf8, Utf16 } } namespace System.Threading { internal static class CancellationTokenExtensions { internal static CancellationTokenRegistration UnsafeRegister(this CancellationToken cancellationToken, Action<object> callback, object state) { return cancellationToken.Register(callback, state); } } } namespace System.Threading.Tasks { internal static class TaskToAsyncResult { private sealed class TaskAsyncResult : IAsyncResult { internal readonly Task _task; private readonly AsyncCallback _callback; public object AsyncState { get; } public bool CompletedSynchronously { get; } public bool IsCompleted => _task.IsCompleted; public WaitHandle AsyncWaitHandle => ((IAsyncResult)_task).AsyncWaitHandle; internal TaskAsyncResult(Task task, object state, AsyncCallback callback) { _task = task; AsyncState = state; if (task.IsCompleted) { CompletedSynchronously = true; callback?.Invoke(this); } else if (callback != null) { _callback = callback; _task.ConfigureAwait(continueOnCapturedContext: false).GetAwaiter().OnCompleted(delegate { _callback(this); }); } } } public static IAsyncResult Begin(Task task, AsyncCallback? callback, object? state) { ExceptionPolyfills.ThrowIfNull(task, "task"); return new TaskAsyncResult(task, state, callback); } public static void End(IAsyncResult asyncResult) { Unwrap(asyncResult).GetAwaiter().GetResult(); } public static TResult End<TResult>(IAsyncResult asyncResult) { return Unwrap<TResult>(asyncResult).GetAwaiter().GetResult(); } public static Task Unwrap(IAsyncResult asyncResult) { ExceptionPolyfills.ThrowIfNull(asyncResult, "asyncResult"); return (asyncResult as TaskAsyncResult)?._task ?? throw new ArgumentException(null, "asyncResult"); } public static Task<TResult> Unwrap<TResult>(IAsyncResult asyncResult) { ExceptionPolyfills.ThrowIfNull(asyncResult, "asyncResult"); return ((asyncResult as TaskAsyncResult)?._task as Task<TResult>) ?? throw new ArgumentException(null, "asyncResult"); } } } namespace System.IO { internal static class StreamHelpers { public static void ValidateCopyToArgs(Stream source, Stream destination, int bufferSize) { ExceptionPolyfills.ThrowIfNull(destination, "destination"); if (bufferSize <= 0) { throw new ArgumentOutOfRangeException("bufferSize", bufferSize, System.SR.ArgumentOutOfRange_NeedPosNum); } bool canRead = source.CanRead; if (!canRead && !source.CanWrite) { throw new ObjectDisposedException(null, System.SR.ObjectDisposed_StreamClosed); } bool canWrite = destination.CanWrite; if (!canWrite && !destination.CanRead) { throw new ObjectDisposedException("destination", System.SR.ObjectDisposed_StreamClosed); } if (!canRead) { throw new NotSupportedException(System.SR.NotSupported_UnreadableStream); } if (!canWrite) { throw new NotSupportedException(System.SR.NotSupported_UnwritableStream); } } } internal static class StreamExtensions { public static ValueTask<int> ReadAsync(this Stream stream, Memory<byte> buffer, CancellationToken cancellationToken = default(CancellationToken)) { if (MemoryMarshal.TryGetArray((ReadOnlyMemory<byte>)buffer, out ArraySegment<byte> segment)) { return new ValueTask<int>(stream.ReadAsync(segment.Array, segment.Offset, segment.Count, cancellationToken)); } byte[] array = ArrayPool<byte>.Shared.Rent(buffer.Length); return FinishReadAsync(stream.ReadAsync(array, 0, buffer.Length, cancellationToken), array, buffer); static async ValueTask<int> FinishReadAsync(Task<int> readTask, byte[] localBuffer, Memory<byte> localDestination) { try { int num = await readTask.ConfigureAwait(continueOnCapturedContext: false); new Span<byte>(localBuffer, 0, num).CopyTo(localDestination.Span); return num; } finally { ArrayPool<byte>.Shared.Return(localBuffer); } } } public static void Write(this Stream stream, ReadOnlyMemory<byte> buffer) { if (MemoryMarshal.TryGetArray(buffer, out var segment)) { stream.Write(segment.Array, segment.Offset, segment.Count); return; } byte[] array = ArrayPool<byte>.Shared.Rent(buffer.Length); try { buffer.Span.CopyTo(array); stream.Write(array, 0, buffer.Length); } finally { ArrayPool<byte>.Shared.Return(array); } } public static ValueTask WriteAsync(this Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default(CancellationToken)) { if (MemoryMarshal.TryGetArray(buffer, out var segment)) { return new ValueTask(stream.WriteAsync(segment.Array, segment.Offset, segment.Count, cancellationToken)); } byte[] array = ArrayPool<byte>.Shared.Rent(buffer.Length); buffer.Span.CopyTo(array); return new ValueTask(FinishWriteAsync(stream.WriteAsync(array, 0, buffer.Length, cancellationToken), array)); } private static async Task FinishWriteAsync(Task writeTask, byte[] localBuffer) { try { await writeTask.ConfigureAwait(continueOnCapturedContext: false); } finally { ArrayPool<byte>.Shared.Return(localBuffer); } } public static Task CopyToAsync(this Stream source, Stream destination, CancellationToken cancellationToken = default(CancellationToken)) { return source.CopyToAsync(destination, 81920, cancellationToken); } } } namespace System.IO.Pipelines { internal sealed class BufferSegment : ReadOnlySequenceSegment<byte> { private IMemoryOwner<byte> _memoryOwner; private byte[] _array; private BufferSegment _next; private int _end; public int End { get { return _end; } set { _end = value; base.Memory = AvailableMemory.Slice(0, value); } } public BufferSegment? NextSegment { get { return _next; } set { base.Next = value; _next = value; } } internal object? MemoryOwner => ((object)_memoryOwner) ?? ((object)_array); public Memory<byte> AvailableMemory { get; private set; } public int Length => End; public int WritableBytes { [MethodImpl(MethodImplOptions.AggressiveInlining)] get { return AvailableMemory.Length - End; } } public void SetOwnedMemory(IMemoryOwner<byte> memoryOwner) { _memoryOwner = memoryOwner; AvailableMemory = memoryOwner.Memory; } public void SetOwnedMemory(byte[] arrayPoolBuffer) { _array = arrayPoolBuffer; AvailableMemory = arrayPoolBuffer; } public void Reset() { ResetMemory(); base.Next = null; base.RunningIndex = 0L; _next = null; } public void ResetMemory() { IMemoryOwner<byte> memoryOwner = _memoryOwner; if (memoryOwner != null) { _memoryOwner = null; memoryOwner.Dispose(); } else { ArrayPool<byte>.Shared.Return(_array); _array = null; } base.Memory = default(ReadOnlyMemory<byte>); _end = 0; AvailableMemory = default(Memory<byte>); } public void SetNext(BufferSegment segment) { NextSegment = segment; segment = this; while (segment.Next != null) { segment.NextSegment.RunningIndex = segment.RunningIndex + segment.Length; segment = segment.NextSegment; } } [MethodImpl(MethodImplOptions.AggressiveInlining)] internal static long GetLength(BufferSegment startSegment, int startIndex, BufferSegment endSegment, int endIndex) { return endSegment.RunningIndex + (uint)endIndex - (startSegment.RunningIndex + (uint)startIndex); } [MethodImpl(MethodImplOptions.AggressiveInlining)] internal static long GetLength(long startPosition, BufferSegment endSegment, int endIndex) { return endSegment.RunningIndex + (uint)endIndex - startPosition; } } internal readonly struct CompletionData { public Action<object?> Completion { get; } public object? CompletionState { get; } public ExecutionContext? ExecutionContext { get; } public SynchronizationContext? SynchronizationContext { get; } public CompletionData(Action<object?> completion, object? completionState, ExecutionContext? executionContext, SynchronizationContext? synchronizationContext) { Completion = completion; CompletionState = completionState; ExecutionContext = executionContext; SynchronizationContext = synchronizationContext; } } public struct FlushResult { internal ResultFlags _resultFlags; public bool IsCanceled => (_resultFlags & ResultFlags.Canceled) != 0; public bool IsCompleted => (_resultFlags & ResultFlags.Completed) != 0; public FlushResult(bool isCanceled, bool isCompleted) { _resultFlags = ResultFlags.None; if (isCanceled) { _resultFlags |= ResultFlags.Canceled; } if (isCompleted) { _resultFlags |= ResultFlags.Completed; } } } internal sealed class InlineScheduler : PipeScheduler { public override void Schedule(Action<object?> action, object? state) { action(state); } internal override void UnsafeSchedule(Action<object?> action, object? state) { action(state); } } public interface IDuplexPipe { PipeReader Input { get; } PipeWriter Output { get; } } internal struct BufferSegmentStack { private readonly struct SegmentAsValueType { private readonly BufferSegment _value; private SegmentAsValueType(BufferSegment value) { _value = value; } public static implicit operator SegmentAsValueType(BufferSegment s) { return new SegmentAsValueType(s); } public static implicit operator BufferSegment(SegmentAsValueType s) { return s._value; } } private SegmentAsValueType[] _array; private int _size; public int Count => _size; public BufferSegmentStack(int size) { _array = new SegmentAsValueType[size]; _size = 0; } public bool TryPop([NotNullWhen(true)] out BufferSegment? result) { int num = _size - 1; SegmentAsValueType[] array = _array; if ((uint)num >= (uint)array.Length) { result = null; return false; } _size = num; result = array[num]; array[num] = default(SegmentAsValueType); return true; } public void Push(BufferSegment item) { int size = _size; SegmentAsValueType[] array = _array; if ((uint)size < (uint)array.Length) { array[size] = item; _size = size + 1; } else { PushWithResize(item); } } [MethodImpl(MethodImplOptions.NoInlining)] private void PushWithResize(BufferSegment item) { Array.Resize(ref _array, 2 * _array.Length); _array[_size] = item; _size++; } } public sealed class Pipe { private sealed class DefaultPipeReader : PipeReader, IValueTaskSource<ReadResult> { private readonly Pipe _pipe; public DefaultPipeReader(Pipe pipe) { _pipe = pipe; } public override bool TryRead(out ReadResult result) { return _pipe.TryRead(out result); } public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default(CancellationToken)) { return _pipe.ReadAsync(cancellationToken); } protected override ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumBytes, CancellationToken cancellationToken) { return _pipe.ReadAtLeastAsync(minimumBytes, cancellationToken); } public override void AdvanceTo(SequencePosition consumed) { _pipe.AdvanceReader(in consumed); } public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) { _pipe.AdvanceReader(in consumed, in examined); } public override void CancelPendingRead() { _pipe.CancelPendingRead(); } public override void Complete(Exception exception = null) { _pipe.CompleteReader(exception); } public override void OnWriterCompleted(Action<Exception, object> callback, object state) { _pipe.OnWriterCompleted(callback, state); } public ValueTaskSourceStatus GetStatus(short token) { return _pipe.GetReadAsyncStatus(); } public ReadResult GetResult(short token) { return _pipe.GetReadAsyncResult(); } public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) { _pipe.OnReadAsyncCompleted(continuation, state, flags); } } private sealed class DefaultPipeWriter : PipeWriter, IValueTaskSource<FlushResult> { private readonly Pipe _pipe; public override bool CanGetUnflushedBytes => true; public override long UnflushedBytes => _pipe.GetUnflushedBytes(); public DefaultPipeWriter(Pipe pipe) { _pipe = pipe; } public override void Complete(Exception exception = null) { _pipe.CompleteWriter(exception); } public override void CancelPendingFlush() { _pipe.CancelPendingFlush(); } public override void OnReaderCompleted(Action<Exception, object> callback, object state) { _pipe.OnReaderCompleted(callback, state); } public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default(CancellationToken)) { return _pipe.FlushAsync(cancellationToken); } public override void Advance(int bytes) { _pipe.Advance(bytes); } public override Memory<byte> GetMemory(int sizeHint = 0) { return _pipe.GetMemory(sizeHint); } public override Span<byte> GetSpan(int sizeHint = 0) { return _pipe.GetSpan(sizeHint); } public ValueTaskSourceStatus GetStatus(short token) { return _pipe.GetFlushAsyncStatus(); } public FlushResult GetResult(short token) { return _pipe.GetFlushAsyncResult(); } public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) { _pipe.OnFlushAsyncCompleted(continuation, state, flags); } public override ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken)) { return _pipe.WriteAsync(source, cancellationToken); } } private static readonly Action<object> s_signalReaderAwaitable = delegate(object state) { ((Pipe)state).ReaderCancellationRequested(); }; private static readonly Action<object> s_signalWriterAwaitable = delegate(object state) { ((Pipe)state).WriterCancellationRequested(); }; private static readonly Action<object> s_invokeCompletionCallbacks = delegate(object state) { ((PipeCompletionCallbacks)state).Execute(); }; private static readonly ContextCallback s_executionContextRawCallback = ExecuteWithoutExecutionContext; private static readonly SendOrPostCallback s_syncContextExecutionContextCallback = ExecuteWithExecutionContext; private static readonly SendOrPostCallback s_syncContextExecuteWithoutExecutionContextCallback = ExecuteWithoutExecutionContext; private static readonly Action<object> s_scheduleWithExecutionContextCallback = ExecuteWithExecutionContext; private BufferSegmentStack _bufferSegmentPool; private readonly DefaultPipeReader _reader; private readonly DefaultPipeWriter _writer; private readonly PipeOptions _options; private readonly object _sync = new object(); private long _unconsumedBytes; private long _unflushedBytes; private PipeAwaitable _readerAwaitable; private PipeAwaitable _writerAwaitable; private PipeCompletion _writerCompletion; private PipeCompletion _readerCompletion; private long _lastExaminedIndex = -1L; private BufferSegment _readHead; private int _readHeadIndex; private bool _disposed; private BufferSegment _readTail; private int _readTailIndex; private int _minimumReadBytes; private BufferSegment _writingHead; private Memory<byte> _writingHeadMemory; private int _writingHeadBytesBuffered; private PipeOperationState _operationState; private bool UseSynchronizationContext => _options.UseSynchronizationContext; private int MinimumSegmentSize => _options.MinimumSegmentSize; private long PauseWriterThreshold => _options.PauseWriterThreshold; private long ResumeWriterThreshold => _options.ResumeWriterThreshold; private PipeScheduler ReaderScheduler => _options.ReaderScheduler; private PipeScheduler WriterScheduler => _options.WriterScheduler; private object SyncObj => _sync; internal long Length => _unconsumedBytes; public PipeReader Reader => _reader; public PipeWriter Writer => _writer; public Pipe() : this(PipeOptions.Default) { } public Pipe(PipeOptions options) { if (options == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.options); } _bufferSegmentPool = new BufferSegmentStack(options.InitialSegmentPoolSize); _operationState = default(PipeOperationState); _readerCompletion = default(PipeCompletion); _writerCompletion = default(PipeCompletion); _options = options; _readerAwaitable = new PipeAwaitable(completed: false, UseSynchronizationContext); _writerAwaitable = new PipeAwaitable(completed: true, UseSynchronizationContext); _reader = new DefaultPipeReader(this); _writer = new DefaultPipeWriter(this); } private void ResetState() { _readerCompletion.Reset(); _writerCompletion.Reset(); _readerAwaitable = new PipeAwaitable(completed: false, UseSynchronizationContext); _writerAwaitable = new PipeAwaitable(completed: true, UseSynchronizationContext); _readTailIndex = 0; _readHeadIndex = 0; _lastExaminedIndex = -1L; _unflushedBytes = 0L; _unconsumedBytes = 0L; } internal Memory<byte> GetMemory(int sizeHint) { if (_writerCompletion.IsCompleted) { ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); } if (sizeHint < 0) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.sizeHint); } AllocateWriteHeadIfNeeded(sizeHint); return _writingHeadMemory; } internal Span<byte> GetSpan(int sizeHint) { if (_writerCompletion.IsCompleted) { ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); } if (sizeHint < 0) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.sizeHint); } AllocateWriteHeadIfNeeded(sizeHint); return _writingHeadMemory.Span; } [MethodImpl(MethodImplOptions.AggressiveInlining)] private void AllocateWriteHeadIfNeeded(int sizeHint) { if (!_operationState.IsWritingActive || _writingHeadMemory.Length == 0 || _writingHeadMemory.Length < sizeHint) { AllocateWriteHeadSynchronized(sizeHint); } } private void AllocateWriteHeadSynchronized(int sizeHint) { lock (SyncObj) { _operationState.BeginWrite(); if (_writingHead == null) { BufferSegment readTail = AllocateSegment(sizeHint); _writingHead = (_readHead = (_readTail = readTail)); _lastExaminedIndex = 0L; return; } int length = _writingHeadMemory.Length; if (length == 0 || length < sizeHint) { if (_writingHeadBytesBuffered > 0) { _writingHead.End += _writingHeadBytesBuffered; _writingHeadBytesBuffered = 0; } if (_writingHead.Length == 0) { _writingHead.ResetMemory(); RentMemory(_writingHead, sizeHint); } else { BufferSegment bufferSegment = AllocateSegment(sizeHint); _writingHead.SetNext(bufferSegment); _writingHead = bufferSegment; } } } } private BufferSegment AllocateSegment(int sizeHint) { BufferSegment bufferSegment = CreateSegmentUnsynchronized(); RentMemory(bufferSegment, sizeHint); return bufferSegment; } private void RentMemory(BufferSegment segment, int sizeHint) { MemoryPool<byte> memoryPool = null; int num = -1; if (!_options.IsDefaultSharedMemoryPool) { memoryPool = _options.Pool; num = memoryPool.MaxBufferSize; } if (sizeHint <= num) { segment.SetOwnedMemory(memoryPool.Rent(GetSegmentSize(sizeHint, num))); } else { int segmentSize = GetSegmentSize(sizeHint); segment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(segmentSize)); } _writingHeadMemory = segment.AvailableMemory; } private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue) { sizeHint = Math.Max(MinimumSegmentSize, sizeHint); return Math.Min(maxBufferSize, sizeHint); } private BufferSegment CreateSegmentUnsynchronized() { if (_bufferSegmentPool.TryPop(out BufferSegment result)) { return result; } return new BufferSegment(); } private void ReturnSegmentUnsynchronized(BufferSegment segment) { if (_bufferSegmentPool.Count < _options.MaxSegmentPoolSize) { _bufferSegmentPool.Push(segment); } } internal bool CommitUnsynchronized() { _operationState.EndWrite(); if (_unflushedBytes == 0L) { return false; } _writingHead.End += _writingHeadBytesBuffered; _readTail = _writingHead; _readTailIndex = _writingHead.End; long unconsumedBytes = _unconsumedBytes; _unconsumedBytes += _unflushedBytes; bool result = true; if (_unconsumedBytes < _minimumReadBytes) { result = false; } else if (PauseWriterThreshold > 0 && unconsumedBytes < PauseWriterThreshold && _unconsumedBytes >= PauseWriterThreshold && !_readerCompletion.IsCompleted) { _writerAwaitable.SetUncompleted(); } _unflushedBytes = 0L; _writingHeadBytesBuffered = 0; return result; } internal void Advance(int bytes) { lock (SyncObj) { if ((uint)bytes > (uint)_writingHeadMemory.Length) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes); } if (!_readerCompletion.IsCompleted) { AdvanceCore(bytes); } } } [MethodImpl(MethodImplOptions.AggressiveInlining)] private void AdvanceCore(int bytesWritten) { _unflushedBytes += bytesWritten; _writingHeadBytesBuffered += bytesWritten; _writingHeadMemory = _writingHeadMemory.Slice(bytesWritten); } internal ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) { return new ValueTask<FlushResult>(Task.FromCanceled<FlushResult>(cancellationToken)); } CompletionData completionData; ValueTask<FlushResult> result; lock (SyncObj) { PrepareFlushUnsynchronized(out completionData, out result, cancellationToken); } TrySchedule(ReaderScheduler, in completionData); return result; } private void PrepareFlushUnsynchronized(out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken) { bool num = CommitUnsynchronized(); _writerAwaitable.BeginOperation(cancellationToken, s_signalWriterAwaitable, this); if (_writerAwaitable.IsCompleted) { FlushResult result2 = default(FlushResult); GetFlushResult(ref result2); result = new ValueTask<FlushResult>(result2); } else { result = new ValueTask<FlushResult>(_writer, 0); } if (num) { _readerAwaitable.Complete(out completionData); } else { completionData = default(CompletionData); } } internal void CompleteWriter(Exception? exception) { PipeCompletionCallbacks pipeCompletionCallbacks; CompletionData completionData; bool isCompleted; lock (SyncObj) { CommitUnsynchronized(); pipeCompletionCallbacks = _writerCompletion.TryComplete(exception); _readerAwaitable.Complete(out completionData); isCompleted = _readerCompletion.IsCompleted; } if (isCompleted) { CompletePipe(); } if (pipeCompletionCallbacks != null) { ScheduleCallbacks(ReaderScheduler, pipeCompletionCallbacks); } TrySchedule(ReaderScheduler, in completionData); } internal void AdvanceReader(in SequencePosition consumed) { AdvanceReader(in consumed, in consumed); } internal void AdvanceReader(in SequencePosition consumed, in SequencePosition examined) { if (_readerCompletion.IsCompleted) { ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed(); } AdvanceReader((BufferSegment)consumed.GetObject(), consumed.GetInteger(), (BufferSegment)examined.GetObject(), examined.GetInteger()); } private void AdvanceReader(BufferSegment consumedSegment, int consumedIndex, BufferSegment examinedSegment, int examinedIndex) { if (consumedSegment != null && examinedSegment != null && BufferSegment.GetLength(consumedSegment, consumedIndex, examinedSegment, examinedIndex) < 0) { ThrowHelper.ThrowInvalidOperationException_InvalidExaminedOrConsumedPosition(); } BufferSegment bufferSegment = null; BufferSegment returnEnd = null; CompletionData completionData = default(CompletionData); lock (SyncObj) { bool flag = false; if (examinedSegment == _readTail) { flag = examinedIndex == _readTailIndex; } if (examinedSegment != null && _lastExaminedIndex >= 0) { long length = BufferSegment.GetLength(_lastExaminedIndex, examinedSegment, examinedIndex); long unconsumedBytes = _unconsumedBytes; _unconsumedBytes -= length; _lastExaminedIndex = examinedSegment.RunningIndex + examinedIndex; if (unconsumedBytes >= ResumeWriterThreshold && _unconsumedBytes < ResumeWriterThreshold) { _writerAwaitable.Complete(out completionData); } } if (consumedSegment != null) { if (_readHead == null) { ThrowHelper.ThrowInvalidOperationException_AdvanceToInvalidCursor(); return; } bufferSegment = _readHead; returnEnd = consumedSegment; if (consumedIndex == returnEnd.Length) { if (_writingHead != returnEnd) { MoveReturnEndToNextBlock(); } else if (_writingHeadBytesBuffered == 0 && !_operationState.IsWritingActive) { _writingHead = null; _writingHeadMemory = default(Memory<byte>); MoveReturnEndToNextBlock(); } else { _readHead = consumedSegment; _readHeadIndex = consumedIndex; } } else { _readHead = consumedSegment; _readHeadIndex = consumedIndex; } } if (flag && !_writerCompletion.IsCompleted) { _readerAwaitable.SetUncompleted(); } while (bufferSegment != null && bufferSegment != returnEnd) { BufferSegment? nextSegment = bufferSegment.NextSegment; bufferSegment.Reset(); ReturnSegmentUnsynchronized(bufferSegment); bufferSegment = nextSegment; } _operationState.EndRead(); } TrySchedule(WriterScheduler, in completionData); void MoveReturnEndToNextBlock() { BufferSegment nextSegment2 = returnEnd.NextSegment; if (_readTail == returnEnd) { _readTail = nextSegment2; _readTailIndex = 0; } _readHead = nextSegment2; _readHeadIndex = 0; returnEnd = nextSegment2; } } internal void CompleteReader(Exception? exception) { PipeCompletionCallbacks pipeCompletionCallbacks; CompletionData completionData; bool isCompleted; lock (SyncObj) { if (_operationState.IsReadingActive) { _operationState.EndRead(); } pipeCompletionCallbacks = _readerCompletion.TryComplete(exception); _writerAwaitable.Complete(out completionData); isCompleted = _writerCompletion.IsCompleted; } if (isCompleted) { CompletePipe(); } if (pipeCompletionCallbacks != null) { ScheduleCallbacks(WriterScheduler, pipeCompletionCallbacks); } TrySchedule(WriterScheduler, in completionData); } internal void OnWriterCompleted(Action<Exception?, object?> callback, object? state) { if (callback == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callback); } PipeCompletionCallbacks pipeCompletionCallbacks; lock (SyncObj) { pipeCompletionCallbacks = _writerCompletion.AddCallback(callback, state); } if (pipeCompletionCallbacks != null) { ScheduleCallbacks(ReaderScheduler, pipeCompletionCallbacks); } } internal void CancelPendingRead() { CompletionData completionData; lock (SyncObj) { _readerAwaitable.Cancel(out completionData); } TrySchedule(ReaderScheduler, in completionData); } internal void CancelPendingFlush() { CompletionData completionData; lock (SyncObj) { _writerAwaitable.Cancel(out completionData); } TrySchedule(WriterScheduler, in completionData); } internal void OnReaderCompleted(Action<Exception?, object?> callback, object? state) { if (callback == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callback); } PipeCompletionCallbacks pipeCompletionCallbacks; lock (SyncObj) { pipeCompletionCallbacks = _readerCompletion.AddCallback(callback, state); } if (pipeCompletionCallbacks != null) { ScheduleCallbacks(WriterScheduler, pipeCompletionCallbacks); } } internal ValueTask<ReadResult> ReadAtLeastAsync(int minimumBytes, CancellationToken token) { if (_readerCompletion.IsCompleted) { ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed(); } if (token.IsCancellationRequested) { return new ValueTask<ReadResult>(Task.FromCanceled<ReadResult>(token)); } CompletionData completionData = default(CompletionData); ValueTask<ReadResult> result2; lock (SyncObj) { _readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this); if (_readerAwaitable.IsCompleted) { GetReadResult(out var result); if (_unconsumedBytes >= minimumBytes || result.IsCanceled || result.IsCompleted) { return new ValueTask<ReadResult>(result); } _readerAwaitable.SetUncompleted(); _operationState.EndRead(); _readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this); } if (!_writerAwaitable.IsCompleted) { _writerAwaitable.Complete(out completionData); } _minimumReadBytes = minimumBytes; result2 = new ValueTask<ReadResult>(_reader, 0); } TrySchedule(WriterScheduler, in completionData); return result2; } internal ValueTask<ReadResult> ReadAsync(CancellationToken token) { if (_readerCompletion.IsCompleted) { ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed(); } if (token.IsCancellationRequested) { return new ValueTask<ReadResult>(Task.FromCanceled<ReadResult>(token)); } lock (SyncObj) { _readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this); if (_readerAwaitable.IsCompleted) { GetReadResult(out var result); return new ValueTask<ReadResult>(result); } return new ValueTask<ReadResult>(_reader, 0); } } internal bool TryRead(out ReadResult result) { lock (SyncObj) { if (_readerCompletion.IsCompleted) { ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed(); } if (_unconsumedBytes > 0 || _readerAwaitable.IsCompleted) { GetReadResult(out result); return true; } if (_readerAwaitable.IsRunning) { ThrowHelper.ThrowInvalidOperationException_AlreadyReading(); } _operationState.BeginReadTentative(); result = default(ReadResult); return false; } } private static void ScheduleCallbacks(PipeScheduler scheduler, PipeCompletionCallbacks completionCallbacks) { scheduler.UnsafeSchedule(s_invokeCompletionCallbacks, completionCallbacks); } [MethodImpl(MethodImplOptions.AggressiveInlining)] private static void TrySchedule(PipeScheduler scheduler, in CompletionData completionData) { Action<object> completion = completionData.Completion; if (completion != null) { if (completionData.SynchronizationContext == null && completionData.ExecutionContext == null) { scheduler.UnsafeSchedule(completion, completionData.CompletionState); } else { ScheduleWithContext(scheduler, in completionData); } } } [MethodImpl(MethodImplOptions.NoInlining)] private static void ScheduleWithContext(PipeScheduler scheduler, in CompletionData completionData) { if (completionData.SynchronizationContext == null) { scheduler.UnsafeSchedule(s_scheduleWithExecutionContextCallback, completionData); } else if (completionData.ExecutionContext == null) { completionData.SynchronizationContext.Post(s_syncContextExecuteWithoutExecutionContextCallback, completionData); } else { completionData.SynchronizationContext.Post(s_syncContextExecutionContextCallback, completionData); } } private static void ExecuteWithoutExecutionContext(object state) { CompletionData completionData = (CompletionData)state; completionData.Completion(completionData.CompletionState); } private static void ExecuteWithExecutionContext(object state) { ExecutionContext.Run(((CompletionData)state).ExecutionContext, s_executionContextRawCallback, state); } private void CompletePipe() { lock (SyncObj) { if (!_disposed) { _disposed = true; BufferSegment bufferSegment = _readHead ?? _readTail; while (bufferSegment != null) { BufferSegment bufferSegment2 = bufferSegment; bufferSegment = bufferSegment.NextSegment; bufferSegment2.Reset(); } _writingHead = null; _writingHeadMemory = default(Memory<byte>); _readHead = null; _readTail = null; _lastExaminedIndex = -1L; } } } internal ValueTaskSourceStatus GetReadAsyncStatus() { if (_readerAwaitable.IsCompleted) { if (_writerCompletion.IsFaulted) { return ValueTaskSourceStatus.Faulted; } return ValueTaskSourceStatus.Succeeded; } return ValueTaskSourceStatus.Pending; } internal void OnReadAsyncCompleted(Action<object?> continuation, object? state, ValueTaskSourceOnCompletedFlags flags) { CompletionData completionData; bool doubleCompletion; lock (SyncObj) { _readerAwaitable.OnCompleted(continuation, state, flags, out completionData, out doubleCompletion); } if (doubleCompletion) { Writer.Complete(ThrowHelper.CreateInvalidOperationException_NoConcurrentOperation()); } TrySchedule(ReaderScheduler, in completionData); } internal ReadResult GetReadAsyncResult() { CancellationTokenRegistration cancellationTokenRegistration = default(CancellationTokenRegistration); CancellationToken cancellationToken = default(CancellationToken); ReadResult result; try { lock (SyncObj) { if (!_readerAwaitable.IsCompleted) { ThrowHelper.ThrowInvalidOperationException_GetResultNotCompleted(); } cancellationTokenRegistration = _readerAwaitable.ReleaseCancellationTokenRegistration(out cancellationToken); GetReadResult(out result); } } finally { cancellationTokenRegistration.Dispose(); } if (result.IsCanceled) { cancellationToken.ThrowIfCancellationRequested(); } return result; } private void GetReadResult(out ReadResult result) { bool isCompleted = _writerCompletion.IsCompletedOrThrow(); bool flag = _readerAwaitable.ObserveCancellation(); BufferSegment readHead = _readHead; if (readHead != null) { ReadOnlySequence<byte> buffer = new ReadOnlySequence<byte>(readHead, _readHeadIndex, _readTail, _readTailIndex); result = new ReadResult(buffer, flag, isCompleted); } else { result = new ReadResult(default(ReadOnlySequence<byte>), flag, isCompleted); } if (flag) { _operationState.BeginReadTentative(); } else { _operationState.BeginRead(); } _minimumReadBytes = 0; } internal ValueTaskSourceStatus GetFlushAsyncStatus() { if (_writerAwaitable.IsCompleted) { if (_readerCompletion.IsFaulted) { return ValueTaskSourceStatus.Faulted; } return ValueTaskSourceStatus.Succeeded; } return ValueTaskSourceStatus.Pending; } internal FlushResult GetFlushAsyncResult() { FlushResult result = default(FlushResult); CancellationToken cancellationToken = default(CancellationToken); CancellationTokenRegistration cancellationTokenRegistration = default(CancellationTokenRegistration); try { lock (SyncObj) { if (!_writerAwaitable.IsCompleted) { ThrowHelper.ThrowInvalidOperationException_GetResultNotCompleted(); } GetFlushResult(ref result); cancellationTokenRegistration = _writerAwaitable.ReleaseCancellationTokenRegistration(out cancellationToken); return result; } } finally { cancellationTokenRegistration.Dispose(); cancellationToken.ThrowIfCancellationRequested(); } } internal long GetUnflushedBytes() { return _unflushedBytes; } private void GetFlushResult(ref FlushResult result) { if (_writerAwaitable.ObserveCancellation()) { result._resultFlags |= ResultFlags.Canceled; } if (_readerCompletion.IsCompletedOrThrow()) { result._resultFlags |= ResultFlags.Completed; } } internal ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken) { if (_writerCompletion.IsCompleted) { ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); } if (_readerCompletion.IsCompletedOrThrow()) { return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, isCompleted: true)); } if (cancellationToken.IsCancellationRequested) { return new ValueTask<FlushResult>(Task.FromCanceled<FlushResult>(cancellationToken)); } CompletionData completionData; ValueTask<FlushResult> result; lock (SyncObj) { AllocateWriteHeadIfNeeded(0); if (source.Length <= _writingHeadMemory.Length) { source.CopyTo(_writingHeadMemory); AdvanceCore(source.Length); } else { WriteMultiSegment(source.Span); } PrepareFlushUnsynchronized(out completionData, out result, cancellationToken); } TrySchedule(ReaderScheduler, in completionData); return result; } private void WriteMultiSegment(ReadOnlySpan<byte> source) { Span<byte> span = _writingHeadMemory.Span; while (true) { int num = Math.Min(span.Length, source.Length); source.Slice(0, num).CopyTo(span); source = source.Slice(num); AdvanceCore(num); if (source.Length != 0) { _writingHead.End += _writingHeadBytesBuffered; _writingHeadBytesBuffered = 0; BufferSegment bufferSegment = AllocateSegment(0); _writingHead.SetNext(bufferSegment); _writingHead = bufferSegment; span = _writingHeadMemory.Span; continue; } break; } } internal void OnFlushAsyncCompleted(Action<object?> continuation, object? state, ValueTaskSourceOnCompletedFlags flags) { CompletionData completionData; bool doubleCompletion; lock (SyncObj) { _writerAwaitable.OnCompleted(continuation, state, flags, out completionData, out doubleCompletion); } if (doubleCompletion) { Reader.Complete(ThrowHelper.CreateInvalidOperationException_NoConcurrentOperation()); } TrySchedule(WriterScheduler, in completionData); } private void ReaderCancellationRequested() { CompletionData completionData; lock (SyncObj) { _readerAwaitable.CancellationTokenFired(out completionData); } TrySchedule(ReaderScheduler, in completionData); } private void WriterCancellationRequested() { CompletionData completionData; lock (SyncObj) { _writerAwaitable.CancellationTokenFired(out completionData); } TrySchedule(WriterScheduler, in completionData); } public void Reset() { lock (SyncObj) { if (!_disposed) { ThrowHelper.ThrowInvalidOperationException_ResetIncompleteReaderWriter(); } _disposed = false; ResetState(); } } } [DebuggerDisplay("CanceledState = {_awaitableState}, IsCompleted = {IsCompleted}")] internal struct PipeAwaitable { [Flags] private enum AwaitableState { None = 0, Completed = 1, Running = 2, Canceled = 4, UseSynchronizationContext = 8 } private sealed class SchedulingContext { public SynchronizationContext SynchronizationContext { get; set; } public ExecutionContext ExecutionContext { get; set; } } private AwaitableState _awaitableState; private Action<object> _completion; private object _completionState; private SchedulingContext _schedulingContext; private CancellationTokenRegistration _cancellationTokenRegistration; private CancellationToken _cancellationToken; private CancellationToken CancellationToken => _cancellationToken; public bool IsCompleted => (_awaitableState & (AwaitableState.Completed | AwaitableState.Canceled)) != 0; public bool IsRunning => (_awaitableState & AwaitableState.Running) != 0; public PipeAwaitable(bool completed, bool useSynchronizationContext) { _awaitableState = (completed ? AwaitableState.Completed : AwaitableState.None) | (useSynchronizationContext ? AwaitableState.UseSynchronizationContext : AwaitableState.None); _completion = null; _completionState = null; _cancellationTokenRegistration = default(CancellationTokenRegistration); _schedulingContext = null; _cancellationToken = CancellationToken.None; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void BeginOperation(CancellationToken cancellationToken, Action<object?> callback, object? state) { if (cancellationToken.CanBeCanceled && !IsCompleted) { _cancellationTokenRegistration = CancellationTokenExtensions.UnsafeRegister(cancellationToken, callback, state); if (_cancellationTokenRegistration == default(CancellationTokenRegistration)) { cancellationToken.ThrowIfCancellationRequested(); } _cancellationToken = cancellationToken; } _awaitableState |= AwaitableState.Running; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Complete(out CompletionData completionData) { ExtractCompletion(out completionData); _awaitableState |= AwaitableState.Completed; } [MethodImpl(MethodImplOptions.AggressiveInlining)] private void ExtractCompletion(out CompletionData completionData) { Action<object> completion = _completion; object completionState = _completionState; SchedulingContext schedulingContext = _schedulingContext; ExecutionContext executionContext = schedulingContext?.ExecutionContext; SynchronizationContext synchronizationContext = schedulingContext?.SynchronizationContext; _completion = null; _completionState = null; _schedulingContext = null; completionData = ((completion != null) ? new CompletionData(completion, completionState, executionContext, synchronizationContext) : default(CompletionData)); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void SetUncompleted() { _awaitableState &= ~AwaitableState.Completed; } public void OnCompleted(Action<object?> continuation, object? state, ValueTaskSourceOnCompletedFlags flags, out CompletionData completionData, out bool doubleCompletion) { completionData = default(CompletionData); doubleCompletion = _completion != null; if (IsCompleted | doubleCompletion) { completionData = new CompletionData(continuation, state, _schedulingContext?.ExecutionContext, _schedulingContext?.SynchronizationContext); return; } _completion = continuation; _completionState = state; if ((_awaitableState & AwaitableState.UseSynchronizationContext) != 0 && (flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0) { SynchronizationContext current = SynchronizationContext.Current; if (current != null && current.GetType() != typeof(SynchronizationContext)) { if (_schedulingContext == null) { _schedulingContext = new SchedulingContext(); } _schedulingContext.SynchronizationContext = current; } } if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0) { if (_schedulingContext == null) { _schedulingContext = new SchedulingContext(); } _schedulingContext.ExecutionContext = ExecutionContext.Capture(); } } public void Cancel(out CompletionData completionData) { ExtractCompletion(out completionData); _awaitableState |= AwaitableState.Canceled; } public void CancellationTokenFired(out CompletionData completionData) { if (CancellationToken.IsCancellationRequested) { Cancel(out completionData); } else { completionData = default(CompletionData); } } [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool ObserveCancellation() { bool result = (_awaitableState & AwaitableState.Canceled) == AwaitableState.Canceled; _awaitableState &= ~(AwaitableState.Running | AwaitableState.Canceled); return result; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public CancellationTokenRegistration ReleaseCancellationTokenRegistration(out CancellationToken cancellationToken) { cancellationToken = CancellationToken; CancellationTokenRegistration cancellationTokenRegistration = _cancellationTokenRegistration; _cancellationToken = default(CancellationToken); _cancellationTokenRegistration = default(CancellationTokenRegistration); return cancellationTokenRegistration; } } [DebuggerDisplay("IsCompleted = {IsCompleted}")] internal struct PipeCompletion { private static readonly object s_completedSuccessfully = new object(); private object _state; private List<PipeCompletionCallback> _callbacks; public bool IsCompleted => _state != null; public bool IsFaulted => _state is ExceptionDispatchInfo; public PipeCompletionCallbacks? TryComplete(Exception? exception = null) { if (_state == null) { if (exception != null) { _state = ExceptionDispatchInfo.Capture(exception); } else { _state = s_completedSuccessfully; } } return GetCallbacks(); } public PipeCompletionCallbacks? AddCallback(Action<Exception?, object?> callback, object? state) { if (_callbacks == null) { _callbacks = new List<PipeCompletionCallback>(); } _callbacks.Add(new PipeCompletionCallback(callback, state)); if (IsCompleted) { return GetCallbacks(); } return null; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool IsCompletedOrThrow() { if (!IsCompleted) { return false; } if (_state is ExceptionDispatchInfo exceptionDispatchInfo) { exceptionDispatchInfo.Throw(); } return true; } private PipeCompletionCallbacks GetCallbacks() { List<PipeCompletionCallback> callbacks = _callbacks; if (callbacks == null) { return null; } _callbacks = null; return new PipeCompletionCallbacks(callbacks, _state as ExceptionDispatchInfo); } public void Reset() { _state = null; } public override string ToString() { return string.Format("{0}: {1}", "IsCompleted", IsCompleted); } } internal readonly struct PipeCompletionCallback { public readonly Action<Exception?, object?> Callback; public readonly object? State; public PipeCompletionCallback(Action<Exception?, object?> callback, object? state) { Callback = callback; State = state; } } internal sealed class PipeCompletionCallbacks { private readonly List<PipeCompletionCallback> _callbacks; private readonly Exception _exception; public PipeCompletionCallbacks(List<PipeCompletionCallback> callbacks, ExceptionDispatchInfo? edi) { _callbacks = callbacks; _exception = edi?.SourceException; } public void Execute() { int count = _callbacks.Count; if (count != 0) { List<Exception> exceptions = null; for (int i = 0; i < count; i++) { PipeCompletionCallback callback = _callbacks[i]; Execute(callback, ref exceptions); } if (exceptions != null) { throw new AggregateException(exceptions); } } } private void Execute(PipeCompletionCallback callback, ref List<Exception> exceptions) { try { callback.Callback(_exception, callback.State); } catch (Exception item) { if (exceptions == null) { exceptions = new List<Exception>(); } exceptions.Add(item); } } } public class PipeOptions { private const int DefaultMinimumSegmentSize = 4096; public static PipeOptions Default { get; } = new PipeOptions(null, null, null, -1L, -1L); public bool UseSynchronizationContext { get; } public long PauseWriterThreshold { get; } public long ResumeWriterThreshold { get; } public int MinimumSegmentSize { get; } public PipeScheduler WriterScheduler { get; } public PipeScheduler ReaderScheduler { get; } public MemoryPool<byte> Pool { get; } internal bool IsDefaultSharedMemoryPool { get; } internal int InitialSegmentPoolSize { get; } internal int MaxSegmentPoolSize { get; } public PipeOptions(MemoryPool<byte>? pool = null, PipeScheduler? readerScheduler = null, PipeScheduler? writerScheduler = null, long pauseWriterThreshold = -1L, long resumeWriterThreshold = -1L, int minimumSegmentSize = -1, bool useSynchronizationContext = true) { MinimumSegmentSize = ((minimumSegmentSize == -1) ? 4096 : minimumSegmentSize); InitialSegmentPoolSize = 4; MaxSegmentPoolSize = 256; if (pauseWriterThreshold == -1) { pauseWriterThreshold = 65536L; } else if (pauseWriterThreshold < 0) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.pauseWriterThreshold); } switch (resumeWriterThreshold) { case -1L: resumeWriterThreshold = 32768L; break; case 0L: resumeWriterThreshold = 1L; break; } if (resumeWriterThreshold < 0 || (pauseWriterThreshold > 0 && resumeWriterThreshold > pauseWriterThreshold)) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.resumeWriterThreshold); } Pool = pool ?? MemoryPool<byte>.Shared; IsDefaultSharedMemoryPool = Pool == MemoryPool<byte>.Shared; ReaderScheduler = readerScheduler ?? PipeScheduler.ThreadPool; WriterScheduler = writerScheduler ?? PipeScheduler.ThreadPool; PauseWriterThreshold = pauseWriterThreshold; ResumeWriterThreshold = resumeWriterThreshold; UseSynchronizationContext = useSynchronizationContext; } } public abstract class PipeReader { private PipeReaderStream _stream; public abstract bool TryRead(out ReadResult result); public abstract ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default(CancellationToken)); public ValueTask<ReadResult> ReadAtLeastAsync(int minimumSize, CancellationToken cancellationToken = default(CancellationToken)) { if (minimumSize < 0) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.minimumSize); } return ReadAtLeastAsyncCore(minimumSize, cancellationToken); } protected virtual async ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumSize, CancellationToken cancellationToken) { ReadResult result; while (true) { result = await ReadAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false); ReadOnlySequence<byte> buffer = result.Buffer; if (buffer.Length >= minimumSize || result.IsCompleted || result.IsCanceled) { break; } AdvanceTo(buffer.Start, buffer.End); } return result; } public abstract void AdvanceTo(SequencePosition consumed); public abstract void AdvanceTo(SequencePosition consumed, SequencePosition examined); public virtual Stream AsStream(bool leaveOpen = false) { if (_stream == null) { _stream = new PipeReaderStream(this, leaveOpen); } else if (leaveOpen) { _stream.LeaveOpen = leaveOpen; } return _stream; } public abstract void CancelPendingRead(); public abstract void Complete(Exception? exception = null); public virtual ValueTask CompleteAsync(Exception? exception = null) { try { Complete(exception); return default(ValueTask); } catch (Exception exception2) { return new ValueTask(Task.FromException(exception2)); } } [Obsolete("OnWriterCompleted has been deprecated and may not be invoked on all implementations of PipeReader.")] public virtual void OnWriterCompleted(Action<Exception?, object?> callback, object? state) { } public static PipeReader Create(Stream stream, StreamPipeReaderOptions? readerOptions = null) { return new StreamPipeReader(stream, readerOptions ?? StreamPipeReaderOptions.s_default); } public static PipeReader Create(ReadOnlySequence<byte> sequence) { return new SequencePipeReader(sequence); } public virtual Task CopyToAsync(PipeWriter destination, CancellationToken cancellationToken = default(CancellationToken)) { if (destination == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.destination); } if (cancellationToken.IsCancellationRequested) { return Task.FromCanceled(cancellationToken); } return CopyToAsyncCore(destination, (PipeWriter destination, ReadOnlyMemory<byte> memory, CancellationToken cancellationToken) => destination.WriteAsync(memory, cancellationToken), cancellationToken); } public virtual Task CopyToAsync(Stream destination, CancellationToken cancellationToken = default(CancellationToken)) { if (destination == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.destination); } if (cancellationToken.IsCancellationRequested) { return Task.FromCanceled(cancellationToken); } return CopyToAsyncCore(destination, delegate(Stream destination, ReadOnlyMemory<byte> memory, CancellationToken cancellationToken) { ValueTask writeTask2 = StreamExtensions.WriteAsync(destination, memory, cancellationToken); if (writeTask2.IsCompletedSuccessfully) { writeTask2.GetAwaiter().GetResult(); return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, isCompleted: false)); } return Awaited(writeTask2); }, cancellationToken); static async ValueTask<FlushResult> Awaited(ValueTask writeTask) { await writeTask.ConfigureAwait(continueOnCapturedContext: false); return new FlushResult(isCanceled: false, isCompleted: false); } } private async Task CopyToAsyncCore<TStream>(TStream destination, Func<TStream, ReadOnlyMemory<byte>, CancellationToken, ValueTask<FlushResult>> writeAsync, CancellationToken cancellationToken) { while (true) { ReadResult result = await ReadAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false); ReadOnlySequence<byte> buffer = result.Buffer; SequencePosition position = buffer.Start; SequencePosition consumed = position; try { if (result.IsCanceled) { ThrowHelper.ThrowOperationCanceledException_ReadCanceled(); } ReadOnlyMemory<byte> memory; while (buffer.TryGet(ref position, out memory)) { if (memory.IsEmpty) { consumed = position; continue; } FlushResult flushResult = await writeAsync(destination, memory, cancellationToken).ConfigureAwait(continueOnCapturedContext: false); if (flushResult.IsCanceled) { ThrowHelper.ThrowOperationCanceledException_FlushCanceled(); } consumed = position; if (!flushResult.IsCompleted) { continue; } return; } consumed = buffer.End; if (result.IsCompleted) { break; } } finally { AdvanceTo(consumed); } } } } [DebuggerDisplay("State = {_state}")] internal struct PipeOperationState { [Flags] internal enum State : byte { Reading = 1, ReadingTentative = 2, Writing = 4 } private State _state; public bool IsWritingActive => (_state & State.Writing) == State.Writing; public bool IsReadingActive => (_state & State.Reading) == State.Reading; [MethodImpl(MethodImplOptions.AggressiveInlining)] public void BeginRead() { if ((_state & State.Reading) == State.Reading) { ThrowHelper.ThrowInvalidOperationException_AlreadyReading(); } _state |= State.Reading; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void BeginReadTentative() { if ((_state & State.Reading) == State.Reading) { ThrowHelper.ThrowInvalidOperationException_AlreadyReading(); } _state |= State.ReadingTentative; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void EndRead() { if ((_state & State.Reading) != State.Reading && (_state & State.ReadingTentative) != State.ReadingTentative) { ThrowHelper.ThrowInvalidOperationException_NoReadToComplete(); } _state &= ~(State.Reading | State.ReadingTentative); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void BeginWrite() { _state |= State.Writing; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void EndWrite() { _state &= ~State.Writing; } } internal sealed class PipeReaderStream : Stream { private readonly PipeReader _pipeReader; public override bool CanRead => true; public override bool CanSeek => false; public override bool CanWrite => false; public override long Length { get { throw new NotSupportedException(); } } public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } } internal bool LeaveOpen { get; set; } public PipeReaderStream(PipeReader pipeReader, bool leaveOpen) { _pipeReader = pipeReader; LeaveOpen = leaveOpen; } protected override void Dispose(bool disposing) { if (!LeaveOpen) { _pipeReader.Complete(); } base.Dispose(disposing); } public override void Flush() { } public override int Read(byte[] buffer, int offset, int count) { if (buffer == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.buffer); } return ReadInternal(new Span<byte>(buffer, offset, count)); } public override int ReadByte() { Span<byte> buffer = stackalloc byte[1]; if (ReadInternal(buffer) != 0) { return buffer[0]; } return -1; } private int ReadInternal(Span<byte> buffer) { ValueTask<ReadResult> valueTask = _pipeReader.ReadAsync(); ReadResult result = (valueTask.IsCompletedSuccessfully ? valueTask.Result : valueTask.AsTask().GetAwaiter().GetResult()); return HandleReadResult(result, buffer); } public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } public override void SetLength(long value) { throw new NotSupportedException(); } public override void Write(byte[] buffer, int offset, int count) { throw new NotSupportedException(); } public sealed override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) { return TaskToAsyncResult.Begin(ReadAsync(buffer, offset, count, default(CancellationToken)), callback, state); } public sealed override int EndRead(IAsyncResult asyncResult) { return TaskToAsyncResult.End<int>(asyncResult); } public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { if (buffer == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.buffer); } return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask(); } private async ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken) { return HandleReadResult(await _pipeReader.ReadAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false), buffer.Span); } private int HandleReadResult(ReadResult result, Span<byte> buffer) { if (result.IsCanceled) { ThrowHelper.ThrowOperationCanceledException_ReadCanceled(); } ReadOnlySequence<byte> buffer2 = result.Buffer; long length = buffer2.Length; SequencePosition consumed = buffer2.Start; try { if (length != 0L) { int num = (int)Math.Min(length, buffer.Length); ReadOnlySequence<byte> source = ((num == length) ? buffer2 : buffer2.Slice(0, num)); consumed = source.End; source.CopyTo(buffer); return num; } if (result.IsCompleted) { return 0; } } finally { _pipeReader.AdvanceTo(consumed); } ThrowHelper.ThrowInvalidOperationException_InvalidZeroByteRead(); return 0; } public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) { StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize); return _pipeReader.CopyToAsync(destination, cancellationToken); } } public abstract class PipeScheduler { private static readonly ThreadPoolScheduler s_threadPoolScheduler = new ThreadPoolScheduler(); private static readonly InlineScheduler s_inlineScheduler = new InlineScheduler(); public static PipeScheduler ThreadPool => s_threadPoolScheduler; public static PipeScheduler Inline => s_inlineScheduler; public abstract void Schedule(Action<object?> action, object? state); internal virtual void UnsafeSchedule(Action<object?> action, object? state) { Schedule(action, state); } } public abstract class PipeWriter : IBufferWriter<byte> { private PipeWriterStream _stream; public virtual bool CanGetUnflushedBytes => false; public virtual long UnflushedBytes { get { throw ThrowHelper.CreateNotSupportedException_UnflushedBytes(); } } public abstract void Complete(Exception? exception = null); public virtual ValueTask CompleteAsync(Exception? exception = null) { try { Complete(exception); return default(ValueTask); } catch (Exception exception2) { return new ValueTask(Task.FromException(exception2)); } } public abstract void CancelPendingFlush(); [Obsolete("OnReaderCompleted has been deprecated and may not be invoked on all implementations of PipeWriter.")] public virtual void OnReaderCompleted(Action<Exception?, object?> callback, object? state) { } public abstract ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default(CancellationToken)); public abstract void Advance(int bytes); public abstract Memory<byte> GetMemory(int sizeHint = 0); public abstract Span<byte> GetSpan(int sizeHint = 0); public virtual Stream AsStream(bool leaveOpen = false) { if (_stream == null) { _stream = new PipeWriterStream(this, leaveOpen); } else if (leaveOpen) { _stream.LeaveOpen = leaveOpen; } return _stream; } public static PipeWriter Create(Stream stream, StreamPipeWriterOptions? writerOptions = null) { return new StreamPipeWriter(stream, writerOptions ?? StreamPipeWriterOptions.s_default); } public virtual ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken)) { this.Write(source.Span); return FlushAsync(cancellationToken); } protected internal virtual async Task CopyFromAsync(Stream source, CancellationToken cancellationToken = default(CancellationToken)) { FlushResult flushResult; do { Memory<byte> memory = GetMemory(); int num = await StreamExtensions.ReadAsync(source, memory, cancellationToken).ConfigureAwait(continueOnCapturedContext: false); if (num != 0) { Advance(num); flushResult = await FlushAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false); if (flushResult.IsCanceled) { ThrowHelper.ThrowOperationCanceledException_FlushCanceled(); } continue; } break; } while (!flushResult.IsCompleted); } } internal sealed class PipeWriterStream : Stream { private readonly PipeWriter _pipeWriter; internal bool LeaveOpen { get; set; } public override bool CanRead => false; public override bool CanSeek => false; public override bool CanWrite => true; public override long Length { get { throw new NotSupportedException(); } } public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } } public PipeWriterStream(PipeWriter pipeWriter, bool leaveOpen) { _pipeWriter = pipeWriter; LeaveOpen = leaveOpen; } protected override void Dispose(bool disposing) { if (!LeaveOpen) { _pipeWriter.Complete(); } } public override void Flush() { FlushAsync().GetAwaiter().GetResult(); } public override int Read(byte[] buffer, int offset, int count) { throw new NotSupportedException(); } public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } public override void SetLength(long value) { throw new NotSupportedException(); } public sealed override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) { return TaskToAsyncResult.Begin(WriteAsync(buffer, offset, count, default(CancellationToken)), callback, state); } public sealed override void EndWrite(IAsyncResult asyncResult) { TaskToAsyncResult.End(asyncResult); } public override void Write(byte[] buffer, int offset, int count) { WriteAsync(buffer, offset, count).GetAwaiter().GetResult(); } public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { if (buffer == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.buffer); } return GetFlushResultAsTask(_pipeWriter.WriteAsync(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken)); } public override Task FlushAsync(CancellationToken cancellationToken) { return GetFlushResultAsTask(_pipeWriter.FlushAsync(cancellationToken)); } private static Task GetFlushResultAsTask(ValueTask<FlushResult> valueTask) { if (valueTask.IsCompletedSuccessfully) { if (valueTask.Result.IsCanceled) { ThrowHelper.ThrowOperationCanceledException_FlushCanceled(); } return Task.CompletedTask; } return AwaitTask(valueTask); static async Task AwaitTask(ValueTask<FlushResult> valueTask) { if ((await valueTask.ConfigureAwait(continueOnCapturedContext: false)).IsCanceled) { ThrowHelper.ThrowOperationCanceledException_FlushCanceled(); } } } } public readonly struct ReadResult { internal readonly ReadOnlySequence<byte> _resultBuffer; internal readonly ResultFlags _resultFlags; public ReadOnlySequence<byte> Buffer => _resultBuffer; public bool IsCanceled => (_resultFlags & ResultFlags.Canceled) != 0; public bool IsCompleted => (_resultFlags & ResultFlags.Completed) != 0; public ReadResult(ReadOnlySequence<byte> buffer, bool isCanceled, bool isCompleted) { _resultBuffer = buffer; _resultFlags = ResultFlags.None; if (isCompleted) { _resultFlags |= ResultFlags.Completed; } if (isCanceled) { _resultFlags |= ResultFlags.Canceled; } } } [Flags] internal enum ResultFlags : byte { None = 0, Canceled = 1, Completed = 2 } internal sealed class SequencePipeReader : PipeReader { private ReadOnlySequence<byte> _sequence; private bool _isReaderCompleted; private int _cancelNext; public SequencePipeReader(ReadOnlySequence<byte> sequence) { _sequence = sequence; } public override void AdvanceTo(SequencePosition consumed) { AdvanceTo(consumed, consumed); } public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) { ThrowIfCompleted(); if (consumed.Equals(_sequence.End)) { _sequence = ReadOnlySequence<byte>.Empty; } else { _sequence = _sequence.Slice(consumed); } } public override void CancelPendingRead() { Interlocked.Exchange(ref _cancelNext, 1); } public override void Complete(Exception? exception = null) { if (!_isReaderCompleted) { _isReaderCompleted = true; _sequence = ReadOnlySequence<byte>.Empty; } } public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default(CancellationToken)) { if (TryRead(out var result)) { return new ValueTask<ReadResult>(result); } result = new ReadResult(ReadOnlySequence<byte>.Empty, isCanceled: false, isCompleted: true); return new ValueTask<ReadResult>(result); } public override bool TryRead(out ReadResult result) { ThrowIfCompleted(); bool flag = Interlocked.Exchange(ref _cancelNext, 0) == 1; if (flag || _sequence.Length > 0) { result = new ReadResult(_sequence, flag, isCompleted: true); return true; } result = default(ReadResult); return false; } private void ThrowIfCompleted() { if (_isReaderCompleted) { ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed(); } } } public static class StreamPipeExtensions { public static Task CopyToAsync(this Stream source, PipeWriter destination, CancellationToken cancellationToken = default(CancellationToken)) { if (source == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.source); } if (destination == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.destination); } if (cancellationToken.IsCancellationRequested) { return Task.FromCanceled(cancellationToken); } return destination.CopyFromAsync(source, cancellationToken); } } internal sealed class StreamPipeReader : PipeReader { internal const int InitialSegmentPoolSize = 4; internal const int MaxSegmentPoolSize = 256; private CancellationTokenSource _internalTokenSource; private bool _isReaderCompleted; private BufferSegment _readHead; private int _readIndex; private BufferSegment _readTail; private long _bufferedBytes; private bool _examinedEverything; private readonly object _lock = new object(); private BufferSegmentStack _bufferSegmentPool; private readonly StreamPipeReaderOptions _options; private bool LeaveOpen => _options.LeaveOpen; private bool UseZeroByteReads => _options.UseZeroByteReads; private int BufferSize => _options.BufferSize; private int MaxBufferSize => _options.MaxBufferSize; private int MinimumReadThreshold => _options.MinimumReadSize; private MemoryPool<byte> Pool => _options.Pool; public Stream InnerStream { get; } private CancellationTokenSource InternalTokenSource { get { lock (_lock) { return _internalTokenSource ?? (_internalTokenSource = new CancellationTokenSource()); } } } public StreamPipeReader(Stream readingStream, StreamPipeReaderOptions options) { if (readingStream == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.readingStream); } if (options == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.options); } InnerStream = readingStream; _options = options; _bufferSegmentPool = new BufferSegmentStack(4); } public override void AdvanceTo(SequencePosition consumed) { AdvanceTo(consumed, consumed); } public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) { ThrowIfCompleted(); AdvanceTo((BufferSegment)consumed.GetObject(), consumed.GetInteger(), (BufferSegment)examined.GetObject(), examined.GetInteger()); } private void AdvanceTo(BufferSegment consumedSegment, int consumedIndex, BufferSegment examinedSegment, int examinedIndex) { if (consumedSegment != null && examinedSegment != null) { if (_readHead == null) { ThrowHelper.ThrowInvalidOperationException_AdvanceToInvalidCursor(); } BufferSegment bufferSegment = _readHead; BufferSegment bufferSegment2 = consumedSegment; long length = BufferSegment.GetLength(bufferSegment, _readIndex, consumedSegment, consumedIndex); _bufferedBytes -= length; _examinedEverything = false; if (examinedSegment == _readTail) { _examinedEverything = examinedIndex == _readTail.End; } if (_bufferedBytes == 0L) { bufferSegment2 = null; _readHead = null; _readTail = null; _readIndex = 0; } else if (consumedIndex == bufferSegment2.Length) { BufferSegment bufferSegment3 = (_readHead = bufferSegment2.NextSegment); _readIndex = 0; bufferSegment2 = bufferSegment3; } else { _readHead = consumedSegment; _readIndex = consumedIndex; } while (bufferSegment != bufferSegment2) { BufferSegment? nextSegment = bufferSegment.NextSegment; ReturnSegmentUnsynchronized(bufferSegment); bufferSegment = nextSegment; } } } public override void CancelPendingRead() { InternalTokenSource.Cancel(); } public override void Complete(Exception? exception = null) { if (CompleteAndGetNeedsDispose()) { InnerStream.Dispose(); } } private bool CompleteAndGetNeedsDispose() { if (_isReaderCompleted) { return false; } _isReaderCompleted = true; BufferSegment bufferSegment = _readHead; while (bufferSegment != null) { BufferSegment bufferSegment2 = bufferSegment; bufferSegment = bufferSegment.NextSegment; bufferSegment2.Reset(); } return !LeaveOpen; } public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default(CancellationToken)) { return ReadInternalAsync(null, cancellationToken); } protected override ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumSize, CancellationToken cancellationToken) { return ReadInternalAsync(minimumSize, cancellationToken); } private ValueTask<ReadResult> ReadInternalAsync(int? minimumSize, CancellationToken cancellationToken) { ThrowIfCompleted(); if (cancellationToken.IsCancellationRequested) { return new ValueTask<ReadResult>(Task.FromCanceled<ReadResult>(cancellationToken)); } CancellationTokenSource internalTokenSource = InternalTokenSource; if (TryReadInternal(internalTokenSource, out var result) && (!minimumSize.HasValue || result.Buffer.Length >= minimumSize || result.IsCompleted || result.IsCanceled)) { return new ValueTask<ReadResult>(result); } return Core(this, minimumSize, internalTokenSource, cancellationToken); static async ValueTask<ReadResult> Core(StreamPipeReader reader, int? minimumSize, CancellationTokenSource tokenSource, CancellationToken cancellationToken) { CancellationTokenRegistration cancellationTokenRegistration = default(CancellationTokenRegistration); if (cancellationToken.CanBeCanceled) { cancellationTokenRegistration = CancellationTokenExtensions.UnsafeRegister(cancellationToken, delegate(object state) { ((StreamPipeReader)state).Cancel(); }, reader); } using (cancellationTokenRegistration) { bool isCanceled = false; bool isCompleted = false; try { if (reader.UseZeroByteReads && reader._bufferedBytes == 0L) { await StreamExtensions.ReadAsync(reader.InnerStream, Memory<byte>.Empty, tokenSource.Token).ConfigureAwait(continueOnCapturedContext: false); } do { reader.AllocateReadTail(minimumSize); Memory<byte> buffer = reader._readTail.AvailableMemory.Slice(reader._readTail.End); int num = await StreamExtensions.ReadAsync(reader.InnerStream, buffer, tokenSource.Token).ConfigureAwait(continueOnCapturedContext: false); reader._readTail.End += num; reader._bufferedBytes += num; if (num == 0) { isCompleted = true; break; } } while (minimumSize.HasValue && reader._bufferedBytes < minimumSize); } catch (OperationCanceledException ex) { reader.ClearCancellationToken(); if (cancellationToken.IsCancellationRequested) { throw new OperationCanceledException(ex.Message, ex, cancellationToken); } if (!tokenSource.IsCancellationRequested) { throw; } isCanceled = true; } return new ReadResult(reader.GetCurrentReadOnlySequence(), isCanceled, isCompleted); } } } public override async Task CopyToAsync(PipeWriter destination, CancellationToken cancellationToken = default(CancellationToken)) { ThrowIfCompleted(); CancellationTokenSource tokenSource = InternalTokenSource; if (tokenSource.IsCancellationRequested) { ThrowHelper.ThrowOperationCanceledException_ReadCanceled(); } CancellationTokenRegistration cancellationTokenRegistration = default(CancellationTokenRegistration); if (cancellationToken.CanBeCanceled) { cancellationTokenRegistration = CancellationTokenExtensions.UnsafeRegister(cancellationToken, delegate(object state) { ((StreamPipeReader)state).Cancel(); }, this); } using (cancellationTokenRegistration) { _ = 1; try { BufferSegment segment = _readHead; int start = _readIndex; try { while (segment != null) { FlushResult flushResult = await destination.WriteAsync(segment.Memory.Slice(start), tokenSource.Token).ConfigureAwait(continueOnCapturedContext: false); if (flushResult.IsCanceled) { ThrowHelper.ThrowOperationCanceledException_FlushCanceled(); } segment = segment.NextSegment; start = 0; if (flushResult.IsCompleted) { return; } } } finally { if (segment != null) { AdvanceTo(segment, segment.End, segment, segment.End); } } await InnerStream.CopyToAsync(destination, tokenSource.Token).ConfigureAwait(continueOnCapturedContext: false); } catch (OperationCanceledException) { ClearCancellationToken(); throw; } } } public override async Task CopyToAsync(Stream destination, CancellationToken cancellationToken = default(CancellationToken)) { ThrowIfCompleted(); CancellationTokenSource tokenSource = InternalTokenSource; if (tokenSource.IsCancellationRequested) { ThrowHelper.ThrowOperationCanceledException_ReadCanceled(); } CancellationTokenRegistration cancellationTokenRegistration = default(CancellationTokenRegistration); if (cancellationToken.CanBeCanceled) { cancellationTokenRegistration = CancellationTokenExtensions.UnsafeRegister(cancellationToken, delegate(object state) { ((StreamPipeReader)state).Cancel(); }, this); } using (cancellationTokenRegistration) { _ = 1; try { BufferSegment segment = _readHead; int start = _readIndex; try { while (segment != null) { await StreamExtensions.WriteAsync(destination, segment.Memory.Slice(start), tokenSource.Token).ConfigureAwait(continueOnCapturedContext: false); segment = segment.NextSegment; start = 0; } } finally { if (segment != null) { AdvanceTo(segment, segment.End, segment, segment.End); } } await StreamExtensions.CopyToAsync(InnerStream, destination, tokenSource.Token).ConfigureAwait(continueOnCapturedContext: false); } catch (OperationCanceledException) { ClearCancellationToken(); throw; } } } private void ClearCancellationToken() { lock (_lock) { _internalTokenSource = null; } } private void ThrowIfCompleted() { if (_isReaderCompleted) { ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed(); } } public override bool TryRead(out ReadResult result) { ThrowIfCompleted(); return TryReadInternal(InternalTokenSource, out result); } private bool TryReadInternal(CancellationTokenSource source, out ReadResult result) { bool isCancellationRequested = source.IsCancellationRequested; if (isCancellationRequested || (_bufferedBytes > 0 && !_examinedEverything)) { if (isCancellationRequested) { ClearCancellationToken(); } ReadOnlySequence<byte> currentReadOnlySequence = GetCurrentReadOnlySequence(); result = new ReadResult(currentReadOnlySequence, isCancellationRequested, isCompleted: false); return true; } result = default(ReadResult); return false; } private ReadOnlySequence<byte> GetCurrentReadOnlySequence() { if (_readHead != null) { return new ReadOnlySequence<byte>(_readHead, _readIndex, _readTail, _readTail.End); } return default(ReadOnlySequence<byte>); } private void AllocateReadTail(int? minimumSize = null) { if (_readHead == null) { _readHead = AllocateSegment(minimumSize); _readTail = _readHead; } else if (_readTail.WritableBytes < MinimumReadThreshold) { BufferSegment bufferSegment = AllocateSegment(minimumSize); _readTail.SetNext(bufferSegment); _readTail = bufferSegment; } } private BufferSegment AllocateSegment(int? minimumSize = null) { BufferSegment bufferSegment = CreateSegmentUnsynchronized(); int num = minimumSize ?? BufferSize; int num2 = ((!_options.IsDefaultSharedMemoryPool) ? _options.Pool.MaxBufferSize : (-1)); if (num <= num2) { int segmentSize = GetSegmentSize(num, num2); bufferSegment.SetOwnedMemory(_options.Pool.Rent(segmentSize)); } else { int segmentSize2 = GetSegmentSize(num, MaxBufferSize); bufferSegment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(segmentSize2)); } return bufferSegment; } private int GetSegmentSize(int sizeHint, int maxBufferSize) { sizeHint = Math.Max(BufferSize, sizeHint); return Math.Min(maxBufferSize, sizeHint); } private BufferSegment CreateSegmentUnsynchronized() { if (_bufferSegmentPool.TryPop(out BufferSegment result)) { return result; } return new BufferSegment(); } private void ReturnSegmentUnsynchronized(BufferSegment segment) { segment.Reset(); if (_bufferSegmentPool.Count < 256) { _bufferSegmentPool.Push(segment); } } private void Cancel() { InternalTokenSource.Cancel(); } } public class StreamPipeReaderOptions { private const int DefaultBufferSize = 4096; internal const int DefaultMaxBufferSize = 2097152; private const int DefaultMinimumReadSize = 1024; internal static readonly StreamPipeReaderOptions s_default = new StreamPipeReaderOptions(); public int BufferSize { get; } internal int MaxBufferSize { get; } = 2097152; public int MinimumReadSize { get; } public MemoryPool<byte> Pool { get; } public bool LeaveOpen { get; } public bool UseZeroByteReads { get; } internal bool IsDefaultSharedMemoryPool { get; } public StreamPipeReaderOptions(MemoryPool<byte>? pool, int bufferSize, int minimumReadSize, bool leaveOpen) : this(pool, bufferSize, minimumReadSize, leaveOpen, useZeroByteReads: false) { } public StreamPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false, bool useZeroByteReads = false) { Pool = pool ?? MemoryPool<byte>.Shared; IsDefaultSharedMemoryPool = Pool == MemoryPool<byte>.Shared; int num; if (bufferSize != -1) { if (bufferSize <= 0) { throw new ArgumentOutOfRangeException("bufferSize"); } num = bufferSize; } else { num = 4096; } BufferSize = num; int num2; if (minimumReadSize != -1) { if (minimumReadSize <= 0) { throw new ArgumentOutOfRangeException("minimumReadSize"); } num2 = minimumReadSize; } else { num2 = 1024; } MinimumReadSize = num2; LeaveOpen = leaveOpen; UseZeroByteReads = useZeroByteReads; } } internal sealed class StreamPipeWriter : PipeWriter { internal const int InitialSegmentPoolSize = 4; internal const int MaxSegmentPoolSize = 256; private readonly int _minimumBufferSize; private BufferSegment _head; private BufferSegment _tail; private Memory<byte> _tailMemory; private int _tailBytesBuffered; private long _bytesBuffered; private readonly MemoryPool<byte> _pool; private readonly int _maxPooledBufferSize; private CancellationTokenSource _internalTokenSource; private bool _isCompleted; private readonly object _lockObject = new object(); private BufferSegmentStack _bufferSegmentPool; private readonly bool _leaveOpen; private CancellationTokenSource InternalTokenSource { get { lock (_lockObject) { return _internalTokenSource ?? (_internalTokenSource = new CancellationTokenSource()); } } } public Stream InnerStream { get; } public override bool CanGetUnflushedBytes => true; public override long UnflushedBytes => _bytesBuffered; public StreamPipeWriter(Stream writingStream, StreamPipeWriterOptions options) { if (writingStream == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.writingStream); } if (options == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.options); } InnerStream = writingStream; _minimumBufferSize = options.MinimumBufferSize; _pool = ((options.Pool == MemoryPool<byte>.Shared) ? null : options.Pool); _maxPooledBufferSize = _pool?.MaxBufferSize ?? (-1); _bufferSegmentPool = new BufferSegmentStack(4); _leaveOpen = options.LeaveOpen; } public override void Advance(int bytes) { if ((uint)bytes > (uint)_tailMemory.Length) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes); } _tailBytesBuffered += bytes; _bytesBuffered += bytes; _tailMemory = _tailMemory.Slice(bytes); } public override Memory<byte> GetMemory(int sizeHint = 0) { if (_isCompleted) { ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); } if (sizeHint < 0) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.sizeHint); } AllocateMemory(sizeHint); return _tailMemory; } public override Span<byte> GetSpan(int sizeHint = 0) { if (_isCompleted) { ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); } if (sizeHint < 0) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.sizeHint); } Allocat