Decompiled source of Aeron Client v1.40.0
BepInEx/core/Aeron.Client/netstandard2.0/Adaptive.Aeron.dll
Decompiled 2 months ago
The result has been truncated due to the large size, download it to view full contents!
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Diagnostics; using System.IO; using System.IO.MemoryMappedFiles; using System.Linq; using System.Reflection; using System.Runtime.CompilerServices; using System.Runtime.Serialization; using System.Runtime.Versioning; using System.Text; using System.Threading; using Adaptive.Aeron.Command; using Adaptive.Aeron.Exceptions; using Adaptive.Aeron.LogBuffer; using Adaptive.Aeron.Status; using Adaptive.Agrona; using Adaptive.Agrona.Collections; using Adaptive.Agrona.Concurrent; using Adaptive.Agrona.Concurrent.Broadcast; using Adaptive.Agrona.Concurrent.Errors; using Adaptive.Agrona.Concurrent.RingBuffer; using Adaptive.Agrona.Concurrent.Status; using Adaptive.Agrona.Util; [assembly: CompilationRelaxations(8)] [assembly: RuntimeCompatibility(WrapNonExceptionThrows = true)] [assembly: Debuggable(DebuggableAttribute.DebuggingModes.IgnoreSymbolStoreSequencePoints)] [assembly: InternalsVisibleTo("Adaptive.Aeron.Tests")] [assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] [assembly: TargetFramework(".NETStandard,Version=v2.0", FrameworkDisplayName = "")] [assembly: AssemblyCompany("Adaptive Financial Consulting Ltd.")] [assembly: AssemblyConfiguration("Release")] [assembly: AssemblyCopyright("Copyright 2023")] [assembly: AssemblyDescription("Efficient reliable UDP unicast, UDP multicast, and IPC transport protocol.")] [assembly: AssemblyFileVersion("1.40.0.0")] [assembly: AssemblyInformationalVersion("1.40.0")] [assembly: AssemblyProduct("Aeron Client")] [assembly: AssemblyTitle("Adaptive.Aeron")] [assembly: AssemblyVersion("1.40.0.0")] namespace Adaptive.Aeron { public class Aeron : IDisposable { public static class Configuration { public static readonly int IdleSleepMs = 16; public static readonly int AWAITING_IDLE_SLEEP_MS = 1; public static readonly long IdleSleepNs = NanoUtil.FromMilliseconds((long)IdleSleepMs); public static readonly long KeepaliveIntervalNs = NanoUtil.FromMilliseconds(500L); public const string RESOURCE_LINGER_DURATION_PROP_NAME = "aeron.client.resource.linger.duration"; public static readonly long RESOURCE_LINGER_DURATION_DEFAULT_NS = NanoUtil.FromSeconds(3L); public const string CLOSE_LINGER_DURATION_PROP_NAME = "aeron.client.close.linger.duration"; public const long CLOSE_LINGER_DURATION_DEFAULT_NS = 0L; public const string PRE_TOUCH_MAPPED_MEMORY_PROP_NAME = "aeron.pre.touch.mapped.memory"; public const bool PRE_TOUCH_MAPPED_MEMORY_DEFAULT = false; public static readonly ErrorHandler DEFAULT_ERROR_HANDLER = (ErrorHandler)delegate(Exception throwable) { lock (Console.Error) { Console.Error.WriteLine(throwable); } if (throwable is DriverTimeoutException) { Console.Error.WriteLine(); Console.Error.WriteLine("***"); Console.Error.WriteLine("*** Timeout for the Media Driver - is it currently running? exiting"); Console.Error.WriteLine("***"); Environment.Exit(-1); } }; public static long ResourceLingerDurationNs() { return Config.GetDurationInNanos("aeron.client.resource.linger.duration", RESOURCE_LINGER_DURATION_DEFAULT_NS); } public static long CloseLingerDurationNs() { return Config.GetDurationInNanos("aeron.client.close.linger.duration", 0L); } public static bool PreTouchMappedMemory() { string property = Config.GetProperty("aeron.pre.touch.mapped.memory"); if (property != null) { return bool.Parse(property); } return false; } } public class Context : IDisposable { private long _clientId; private bool _useConductorAgentInvoker; private bool _preTouchMappedMemory = Configuration.PreTouchMappedMemory(); private AgentInvoker _driverAgentInvoker; private ILock _clientLock; private IEpochClock _epochClock; private INanoClock _nanoClock; private IIdleStrategy _idleStrategy; private IIdleStrategy _awaitingIdleStrategy; private CopyBroadcastReceiver _toClientBuffer; private IRingBuffer _toDriverBuffer; private DriverProxy _driverProxy; private ILogBuffersFactory _logBuffersFactory; private ErrorHandler _errorHandler; private ErrorHandler _subscriberErrorHandler; private AvailableImageHandler _availableImageHandler; private UnavailableImageHandler _unavailableImageHandler; private AvailableCounterHandler _availableCounterHandler; private UnavailableCounterHandler _unavailableCounterHandler; private Action _closeHandler; private long _keepAliveIntervalNs = Configuration.KeepaliveIntervalNs; private long _interServiceTimeoutNs; private long _resourceLingerDurationNs = Configuration.ResourceLingerDurationNs(); private long _closeLingerDurationNs = Configuration.CloseLingerDurationNs(); private FileInfo _cncFile; private string _aeronDirectoryName = GetAeronDirectoryName(); private DirectoryInfo _aeronDirectory; private long _driverTimeoutMs = 10000L; private MappedByteBuffer _cncByteBuffer; private UnsafeBuffer _cncMetaDataBuffer; private UnsafeBuffer _countersMetaDataBuffer; private UnsafeBuffer _countersValuesBuffer; private IThreadFactory _threadFactory = (IThreadFactory)new DefaultThreadFactory(); private int _isConcluded; public const string AERON_DIR_PROP_NAME = "aeron.dir"; public static readonly string AERON_DIR_PROP_DEFAULT; public const string IPC_MEDIA = "ipc"; public const string UDP_MEDIA = "udp"; public const string IPC_CHANNEL = "aeron:ipc"; public const string UDP_CHANNEL = "aeron:udp"; public const string SPY_PREFIX = "aeron-spy:"; public const string ENDPOINT_PARAM_NAME = "endpoint"; public const string INTERFACE_PARAM_NAME = "interface"; public const string DEBUG_TIMEOUT_PROP_NAME = "aeron.debug.timeout"; public const long DRIVER_TIMEOUT_MS = 10000L; public const int NULL_SESSION_ID = -1; public const string INITIAL_TERM_ID_PARAM_NAME = "init-term-id"; public const string TERM_ID_PARAM_NAME = "term-id"; public const string TERM_OFFSET_PARAM_NAME = "term-offset"; public const string TERM_LENGTH_PARAM_NAME = "term-length"; public const string MTU_LENGTH_PARAM_NAME = "mtu"; public const string TTL_PARAM_NAME = "ttl"; public const string MDC_CONTROL_PARAM_NAME = "control"; public const string MDC_CONTROL_MODE_PARAM_NAME = "control-mode"; public const string MTU_LENGTH_URI_PARAM_NAME = "mtu"; public const string MDC_CONTROL_MODE = "control-mode"; public const string MDC_CONTROL_MODE_MANUAL = "manual"; public const string MDC_CONTROL_MODE_DYNAMIC = "dynamic"; public const string SESSION_ID_PARAM_NAME = "session-id"; public const string LINGER_PARAM_NAME = "linger"; public const string RELIABLE_STREAM_PARAM_NAME = "reliable"; public const string TAGS_PARAM_NAME = "tags"; public const string TAG_PREFIX = "tag:"; public const string SPARSE_PARAM_NAME = "sparse"; public const string ALIAS_PARAM_NAME = "alias"; public const string EOS_PARAM_NAME = "eos"; public const string TETHER_PARAM_NAME = "tether"; public const string GROUP_PARAM_NAME = "group"; public const string REJOIN_PARAM_NAME = "rejoin"; public const string CONGESTION_CONTROL_PARAM_NAME = "cc"; public const string FLOW_CONTROL_PARAM_NAME = "fc"; public const string GROUP_TAG_PARAM_NAME = "gtag"; public const string SPIES_SIMULATE_CONNECTION_PARAM_NAME = "ssc"; public const string SOCKET_SNDBUF_PARAM_NAME = "so-sndbuf"; public const string SOCKET_RCVBUF_PARAM_NAME = "so-rcvbuf"; public const string RECEIVER_WINDOW_LENGTH_PARAM_NAME = "rcv-wnd"; public const string MEDIA_RCV_TIMESTAMP_OFFSET_PARAM_NAME = "media-rcv-ts-offset"; public const string CHANNEL_RECEIVE_TIMESTAMP_OFFSET_PARAM_NAME = "channel-rcv-ts-offset"; public const string CHANNEL_SEND_TIMESTAMP_OFFSET_PARAM_NAME = "channel-snd-ts-offset"; public const string RESERVED_OFFSET = "reserved"; public const string FALLBACK_LOGGER_PROP_NAME = "aeron.fallback.logger"; private static readonly ConcurrentDictionary<string, bool> DebugFieldsSeen; public bool IsConcluded => 1 == _isConcluded; static Context() { DebugFieldsSeen = new ConcurrentDictionary<string, bool>(); string text = null; if (Environment.OSVersion.Platform == PlatformID.Unix && Directory.Exists("/dev/shm")) { text = "/dev/shm/aeron"; } if (text == null) { text = Path.Combine(Path.GetTempPath(), "aeron"); } AERON_DIR_PROP_DEFAULT = text + "-" + Environment.UserName; } public static TextWriter FallbackLogger() { string property = Config.GetProperty("aeron.fallback.logger", "stderr"); if (!(property == "stdout")) { if (!(property == "stderr")) { } return Console.Error; } return Console.Out; } public static string GetAeronDirectoryName() { return Config.GetProperty("aeron.dir", AERON_DIR_PROP_DEFAULT); } public Context ConcludeAeronDirectory() { if (_aeronDirectory == null) { _aeronDirectory = new DirectoryInfo(_aeronDirectoryName); } return this; } public Context Clone() { return (Context)MemberwiseClone(); } public Context Conclude() { //IL_0044: Unknown result type (might be due to invalid IL or missing references) //IL_004e: Expected O, but got Unknown //IL_00a4: Unknown result type (might be due to invalid IL or missing references) //IL_00ae: Expected O, but got Unknown //IL_00bc: Unknown result type (might be due to invalid IL or missing references) //IL_00c6: Expected O, but got Unknown //IL_0138: Unknown result type (might be due to invalid IL or missing references) //IL_0142: Expected O, but got Unknown //IL_015c: Unknown result type (might be due to invalid IL or missing references) //IL_0166: Expected O, but got Unknown //IL_0161: Unknown result type (might be due to invalid IL or missing references) //IL_016b: Expected O, but got Unknown if (Interlocked.Exchange(ref _isConcluded, 1) != 0) { throw new ConcurrentConcludeException(); } ConcludeAeronDirectory(); _cncFile = new FileInfo(Path.Combine(_aeronDirectory.FullName, "cnc.dat")); if (_clientLock == null) { _clientLock = (ILock)new ReentrantLock(); } else if (_clientLock is NoOpLock && !_useConductorAgentInvoker) { throw new AeronException("Must use Aeron.Context.UseConductorAgentInvoker(true) when Aeron.Context.ClientLock(...) is using a NoOpLock"); } if (_epochClock == null) { _epochClock = (IEpochClock)(object)SystemEpochClock.INSTANCE; } if (_nanoClock == null) { _nanoClock = (INanoClock)(object)SystemNanoClock.INSTANCE; } if (_idleStrategy == null) { _idleStrategy = (IIdleStrategy)new SleepingIdleStrategy(Configuration.IdleSleepMs); } if (_awaitingIdleStrategy == null) { _awaitingIdleStrategy = (IIdleStrategy)new SleepingIdleStrategy(Configuration.AWAITING_IDLE_SLEEP_MS); } if (CncFile() != null) { ConnectToDriver(); } _interServiceTimeoutNs = CncFileDescriptor.ClientLivenessTimeoutNs((IDirectBuffer)(object)_cncMetaDataBuffer); if (_interServiceTimeoutNs <= _keepAliveIntervalNs) { throw new ConfigurationException("interServiceTimeoutNs=" + _interServiceTimeoutNs + " <= keepAliveIntervalNs=" + _keepAliveIntervalNs); } if (_toDriverBuffer == null) { _toDriverBuffer = (IRingBuffer)new ManyToOneRingBuffer((IAtomicBuffer)(object)CncFileDescriptor.CreateToDriverBuffer(_cncByteBuffer, (IDirectBuffer)(object)_cncMetaDataBuffer)); } if (_toClientBuffer == null) { _toClientBuffer = new CopyBroadcastReceiver(new BroadcastReceiver((IAtomicBuffer)(object)CncFileDescriptor.CreateToClientsBuffer(_cncByteBuffer, (IDirectBuffer)(object)_cncMetaDataBuffer))); } if (CountersMetaDataBuffer() == null) { CountersMetaDataBuffer(CncFileDescriptor.CreateCountersMetaDataBuffer(_cncByteBuffer, (IDirectBuffer)(object)_cncMetaDataBuffer)); } if (CountersValuesBuffer() == null) { CountersValuesBuffer(CncFileDescriptor.CreateCountersValuesBuffer(_cncByteBuffer, (IDirectBuffer)(object)_cncMetaDataBuffer)); } if (_logBuffersFactory == null) { _logBuffersFactory = new MappedLogBuffersFactory(); } if (_errorHandler == null) { _errorHandler = Configuration.DEFAULT_ERROR_HANDLER; } if (_subscriberErrorHandler == null) { _subscriberErrorHandler = _errorHandler; } if (_availableImageHandler == null) { _availableImageHandler = delegate { }; } if (_unavailableImageHandler == null) { _unavailableImageHandler = delegate { }; } if (_driverProxy == null) { _clientId = _toDriverBuffer.NextCorrelationId(); _driverProxy = new DriverProxy(ToDriverBuffer(), _clientId); } return this; } public long ClientId() { return _clientId; } public FileInfo CncFile() { return _cncFile; } public Context UseConductorAgentInvoker(bool useConductorAgentInvoker) { _useConductorAgentInvoker = useConductorAgentInvoker; return this; } public bool UseConductorAgentInvoker() { return _useConductorAgentInvoker; } public Context PreTouchMappedMemory(bool preTouchMappedMemory) { _preTouchMappedMemory = preTouchMappedMemory; return this; } public bool PreTouchMappedMemory() { return _preTouchMappedMemory; } public Context DriverAgentInvoker(AgentInvoker driverAgentInvoker) { _driverAgentInvoker = driverAgentInvoker; return this; } public AgentInvoker DriverAgentInvoker() { return _driverAgentInvoker; } public Context ClientLock(ILock @lock) { _clientLock = @lock; return this; } public ILock ClientLock() { return _clientLock; } public Context EpochClock(IEpochClock clock) { _epochClock = clock; return this; } public IEpochClock EpochClock() { return _epochClock; } public Context NanoClock(INanoClock clock) { _nanoClock = clock; return this; } public INanoClock NanoClock() { return _nanoClock; } public Context IdleStrategy(IIdleStrategy idleStrategy) { _idleStrategy = idleStrategy; return this; } public IIdleStrategy IdleStrategy() { return _idleStrategy; } public Context AwaitingIdleStrategy(IIdleStrategy idleStrategy) { _awaitingIdleStrategy = idleStrategy; return this; } public IIdleStrategy AwaitingIdleStrategy() { return _awaitingIdleStrategy; } internal Context ToClientBuffer(CopyBroadcastReceiver toClientBuffer) { _toClientBuffer = toClientBuffer; return this; } public CopyBroadcastReceiver ToClientBuffer() { return _toClientBuffer; } public IRingBuffer ToDriverBuffer() { return _toDriverBuffer; } internal Context DriverProxy(DriverProxy driverProxy) { _driverProxy = driverProxy; return this; } public DriverProxy DriverProxy() { return _driverProxy; } internal Context LogBuffersFactory(ILogBuffersFactory logBuffersFactory) { _logBuffersFactory = logBuffersFactory; return this; } public ILogBuffersFactory LogBuffersFactory() { return _logBuffersFactory; } public Context ErrorHandler(ErrorHandler errorHandler) { _errorHandler = errorHandler; return this; } public ErrorHandler ErrorHandler() { return _errorHandler; } public Context SubscriberErrorHandler(ErrorHandler errorHandler) { _subscriberErrorHandler = errorHandler; return this; } public ErrorHandler SubscriberErrorHandler() { return _subscriberErrorHandler; } public Context AvailableImageHandler(AvailableImageHandler handler) { _availableImageHandler = handler; return this; } public AvailableImageHandler AvailableImageHandler() { return _availableImageHandler; } public Context UnavailableImageHandler(UnavailableImageHandler handler) { _unavailableImageHandler = handler; return this; } public UnavailableImageHandler UnavailableImageHandler() { return _unavailableImageHandler; } public Context AvailableCounterHandler(AvailableCounterHandler handler) { _availableCounterHandler = handler; return this; } public AvailableCounterHandler AvailableCounterHandler() { return _availableCounterHandler; } public Context UnavailableCounterHandler(UnavailableCounterHandler handler) { _unavailableCounterHandler = handler; return this; } public UnavailableCounterHandler UnavailableCounterHandler() { return _unavailableCounterHandler; } public Context CloseHandler(Action handler) { _closeHandler = handler; return this; } public Action CloseHandler() { return _closeHandler; } public UnsafeBuffer CountersMetaDataBuffer() { return _countersMetaDataBuffer; } public Context CountersMetaDataBuffer(UnsafeBuffer countersMetaDataBuffer) { _countersMetaDataBuffer = countersMetaDataBuffer; return this; } public UnsafeBuffer CountersValuesBuffer() { return _countersValuesBuffer; } public Context CountersValuesBuffer(UnsafeBuffer countersValuesBuffer) { _countersValuesBuffer = countersValuesBuffer; return this; } public Context KeepAliveIntervalNs(long value) { _keepAliveIntervalNs = value; return this; } public long KeepAliveIntervalNs() { return _keepAliveIntervalNs; } public Context DriverTimeoutMs(long driverTimeoutMs) { _driverTimeoutMs = driverTimeoutMs; return this; } public long DriverTimeoutMs() { return CheckDebugTimeout(_driverTimeoutMs, TimeUnit.MILLIS, "DriverTimeoutMs"); } public static long CheckDebugTimeout(long timeout, TimeUnit timeUnit, string debugFieldName) { string property = Config.GetProperty("aeron.debug.timeout"); if (property == null || !Debugger.IsAttached) { return timeout; } try { long num = SystemUtil.ParseDuration("aeron.debug.timeout", property); long result = timeUnit.Convert(num, TimeUnit.NANOSECONDS); if (DebugFieldsSeen.TryAdd(debugFieldName, value: true)) { Console.WriteLine("Using debug timeout [" + result + "] for " + debugFieldName + " replacing [" + timeout + "]"); } return result; } catch (FormatException) { return timeout; } } internal Context InterServiceTimeoutNs(long interServiceTimeout) { _interServiceTimeoutNs = interServiceTimeout; return this; } public long InterServiceTimeoutNs() { return CheckDebugTimeout(_interServiceTimeoutNs, TimeUnit.NANOSECONDS, "InterServiceTimeoutNs"); } public Context ResourceLingerDurationNs(long resourceLingerDurationNs) { _resourceLingerDurationNs = resourceLingerDurationNs; return this; } public long ResourceLingerDurationNs() { return _resourceLingerDurationNs; } public Context CloseLingerDurationNs(long closeLingerDurationNs) { _closeLingerDurationNs = closeLingerDurationNs; return this; } public long CloseLingerDurationNs() { return _closeLingerDurationNs; } public string AeronDirectoryName() { return _aeronDirectoryName; } public DirectoryInfo AeronDirectory() { return _aeronDirectory; } public Context AeronDirectoryName(string dirName) { _aeronDirectoryName = dirName; return this; } public Context ThreadFactory(IThreadFactory threadFactory) { _threadFactory = threadFactory; return this; } public IThreadFactory ThreadFactory() { return _threadFactory; } public void Dispose() { MappedByteBuffer cncByteBuffer = _cncByteBuffer; if (cncByteBuffer != null) { cncByteBuffer.Dispose(); } _cncByteBuffer = null; UnsafeBuffer cncMetaDataBuffer = _cncMetaDataBuffer; if (cncMetaDataBuffer != null) { cncMetaDataBuffer.Dispose(); } UnsafeBuffer countersMetaDataBuffer = _countersMetaDataBuffer; if (countersMetaDataBuffer != null) { countersMetaDataBuffer.Dispose(); } UnsafeBuffer countersValuesBuffer = _countersValuesBuffer; if (countersValuesBuffer != null) { countersValuesBuffer.Dispose(); } MappedByteBuffer cncByteBuffer2 = _cncByteBuffer; if (cncByteBuffer2 != null) { cncByteBuffer2.Dispose(); } } public override string ToString() { return "Aeron.Context\n{\n isConcluded=" + _isConcluded + "\n aeronDirectory=" + AeronDirectory()?.ToString() + "\n aeronDirectoryName='" + AeronDirectoryName() + "'\n cncFile=" + CncFile()?.ToString() + "\n countersMetaDataBuffer=" + ((object)CountersMetaDataBuffer())?.ToString() + "\n countersValuesBuffer=" + ((object)CountersValuesBuffer())?.ToString() + "\n driverTimeoutMs=" + DriverTimeoutMs() + "\n clientId=" + _clientId + "\n useConductorAgentInvoker=" + _useConductorAgentInvoker + "\n preTouchMappedMemory=" + _preTouchMappedMemory + "\n driverAgentInvoker=" + ((object)_driverAgentInvoker)?.ToString() + "\n clientLock=" + ((object)_clientLock)?.ToString() + "\n epochClock=" + ((object)_epochClock)?.ToString() + "\n nanoClock=" + ((object)_nanoClock)?.ToString() + "\n idleStrategy=" + ((object)_idleStrategy)?.ToString() + "\n awaitingIdleStrategy=" + ((object)_awaitingIdleStrategy)?.ToString() + "\n toClientBuffer=" + ((object)_toClientBuffer)?.ToString() + "\n toDriverBuffer=" + ((object)_toDriverBuffer)?.ToString() + "\n driverProxy=" + _driverProxy?.ToString() + "\n cncByteBuffer=" + ((object)_cncByteBuffer)?.ToString() + "\n cncMetaDataBuffer=" + ((object)_cncMetaDataBuffer)?.ToString() + "\n logBuffersFactory=" + _logBuffersFactory?.ToString() + "\n errorHandler=" + ((object)_errorHandler)?.ToString() + "\n subscriberErrorHandler=" + ((object)_subscriberErrorHandler)?.ToString() + "\n availableImageHandler=" + _availableImageHandler?.ToString() + "\n unavailableImageHandler=" + _unavailableImageHandler?.ToString() + "\n availableCounterHandler=" + _availableCounterHandler?.ToString() + "\n unavailableCounterHandler=" + _unavailableCounterHandler?.ToString() + "\n closeHandler=" + _closeHandler?.ToString() + "\n keepAliveIntervalNs=" + _keepAliveIntervalNs + "\n interServiceTimeoutNs=" + _interServiceTimeoutNs + "\n resourceLingerDurationNs=" + _resourceLingerDurationNs + "\n closeLingerDurationNs=" + _closeLingerDurationNs + "\n threadFactory=" + ((object)_threadFactory)?.ToString() + "\n}"; } private void ConnectToDriver() { //IL_0117: Unknown result type (might be due to invalid IL or missing references) //IL_011d: Expected O, but got Unknown long num = _epochClock.Time() + DriverTimeoutMs(); FileInfo fileInfo = CncFile(); while (_toDriverBuffer == null) { fileInfo.Refresh(); _cncByteBuffer = WaitForFileMapping(fileInfo, _epochClock, num); _cncMetaDataBuffer = CncFileDescriptor.CreateMetaDataBuffer(_cncByteBuffer); int intVolatile; while ((intVolatile = _cncMetaDataBuffer.GetIntVolatile(CncFileDescriptor.CncVersionOffset(0))) == 0) { if (_epochClock.Time() > num) { throw new DriverTimeoutException("CnC file is created but not initialised"); } Sleep(Configuration.AWAITING_IDLE_SLEEP_MS); } CncFileDescriptor.CheckVersion(intVolatile); if (SemanticVersion.Minor(intVolatile) < SemanticVersion.Minor(CncFileDescriptor.CNC_VERSION)) { throw new AeronException("driverVersion=" + SemanticVersion.ToString(intVolatile) + " insufficient for clientVersion=" + SemanticVersion.ToString(CncFileDescriptor.CNC_VERSION)); } if (!CncFileDescriptor.IsCncFileLengthSufficient((IDirectBuffer)(object)_cncMetaDataBuffer, _cncByteBuffer.Capacity)) { MappedByteBuffer cncByteBuffer = _cncByteBuffer; if (cncByteBuffer != null) { cncByteBuffer.Dispose(); } _cncByteBuffer = null; _cncMetaDataBuffer = null; Sleep(Configuration.AWAITING_IDLE_SLEEP_MS); continue; } ManyToOneRingBuffer val = new ManyToOneRingBuffer((IAtomicBuffer)(object)CncFileDescriptor.CreateToDriverBuffer(_cncByteBuffer, (IDirectBuffer)(object)_cncMetaDataBuffer)); while (val.ConsumerHeartbeatTime() == 0L) { if (_epochClock.Time() > num) { throw new DriverTimeoutException("no driver heartbeat detected."); } Sleep(Configuration.AWAITING_IDLE_SLEEP_MS); } long num2 = _epochClock.Time(); if (val.ConsumerHeartbeatTime() < num2 - DriverTimeoutMs()) { if (num2 > num) { throw new DriverTimeoutException("no driver heartbeat detected."); } IoUtil.Unmap(_cncByteBuffer); _cncByteBuffer = null; _cncMetaDataBuffer = null; Sleep(100); } else { _toDriverBuffer = (IRingBuffer)(object)val; } } } private static MappedByteBuffer WaitForFileMapping(FileInfo cncFile, IEpochClock clock, long deadLineMs) { while (true) { if (!cncFile.Exists || cncFile.Length <= 0) { if (clock.Time() > deadLineMs) { break; } Sleep(Configuration.IdleSleepMs); cncFile.Refresh(); continue; } try { FileAccess access = FileAccess.ReadWrite; FileShare share = FileShare.ReadWrite | FileShare.Delete; FileStream fileStream = cncFile.Open(FileMode.Open, access, share); if (fileStream.Length < CncFileDescriptor.META_DATA_LENGTH) { if (clock.Time() > deadLineMs) { throw new DriverTimeoutException("CnC file is created but not populated."); } fileStream.Dispose(); Sleep(Configuration.IdleSleepMs); continue; } return IoUtil.MapExistingFile(fileStream); } catch (FileNotFoundException) { } catch (IOException) { } catch (Exception innerException) { throw new AeronException("cannot open CnC file", innerException); } } throw new DriverTimeoutException("CnC file not created: " + cncFile.FullName); } public void DeleteAeronDirectory() { IoUtil.Delete(_aeronDirectory, false); } public MappedByteBuffer MapExistingCncFile(Action<string> logProgress) { FileInfo fileInfo = new FileInfo(Path.Combine(_aeronDirectory.FullName, "cnc.dat")); if (fileInfo.Exists && fileInfo.Length > CncFileDescriptor.END_OF_METADATA_OFFSET) { logProgress?.Invoke("INFO: Aeron CnC file " + fileInfo?.ToString() + "exists"); return IoUtil.MapExistingFile(fileInfo, "cnc.dat"); } return null; } public static bool IsDriverActive(DirectoryInfo directory, long driverTimeoutMs, Action<string> logger) { FileInfo fileInfo = new FileInfo(Path.Combine(directory.FullName, "cnc.dat")); if (fileInfo.Exists && fileInfo.Length > CncFileDescriptor.END_OF_METADATA_OFFSET) { logger("INFO: Aeron CnC file " + fileInfo?.ToString() + " exists"); MappedByteBuffer val = IoUtil.MapExistingFile(fileInfo, "CnC file"); try { return IsDriverActive(driverTimeoutMs, logger, val); } finally { if (val != null) { val.Dispose(); } } } return false; } public bool IsDriverActive(long driverTimeoutMs, Action<string> logger) { MappedByteBuffer val = MapExistingCncFile(logger); try { return IsDriverActive(driverTimeoutMs, logger, val); } finally { if (val != null) { val.Dispose(); } } } public static bool IsDriverActive(long driverTimeoutMs, Action<string> logger, MappedByteBuffer cncByteBuffer) { //IL_004c: Unknown result type (might be due to invalid IL or missing references) if (cncByteBuffer == null) { return false; } UnsafeBuffer val = CncFileDescriptor.CreateMetaDataBuffer(cncByteBuffer); long num = UnixTimeConverter.CurrentUnixTimeMillis(); int intVolatile; while ((intVolatile = val.GetIntVolatile(CncFileDescriptor.CncVersionOffset(0))) == 0) { if (UnixTimeConverter.CurrentUnixTimeMillis() > num + driverTimeoutMs) { throw new DriverTimeoutException("CnC file is created but not initialised."); } Sleep(1); } CncFileDescriptor.CheckVersion(intVolatile); long num2 = new ManyToOneRingBuffer((IAtomicBuffer)(object)CncFileDescriptor.CreateToDriverBuffer(cncByteBuffer, (IDirectBuffer)(object)val)).ConsumerHeartbeatTime(); long num3 = DateTime.Now.ToFileTimeUtc() - num2; logger("INFO: Aeron toDriver consumer heartbeat is (ms):" + num3); return num3 <= driverTimeoutMs; } public static bool RequestDriverTermination(DirectoryInfo directory, IDirectBuffer tokenBuffer, int tokenOffset, int tokenLength) { //IL_005d: Unknown result type (might be due to invalid IL or missing references) //IL_0062: Unknown result type (might be due to invalid IL or missing references) //IL_0071: Expected O, but got Unknown FileInfo fileInfo = new FileInfo(Path.Combine(directory.FullName, "cnc.dat")); if (fileInfo.Exists && fileInfo.Length > CncFileDescriptor.END_OF_METADATA_OFFSET) { MappedByteBuffer val = IoUtil.MapExistingFile(fileInfo, "CnC file"); try { UnsafeBuffer val2 = CncFileDescriptor.CreateMetaDataBuffer(val); int intVolatile = val2.GetIntVolatile(CncFileDescriptor.CncVersionOffset(0)); if (intVolatile > 0) { CncFileDescriptor.CheckVersion(intVolatile); ManyToOneRingBuffer val3 = new ManyToOneRingBuffer((IAtomicBuffer)(object)CncFileDescriptor.CreateToDriverBuffer(val, (IDirectBuffer)(object)val2)); long clientId = val3.NextCorrelationId(); return new DriverProxy((IRingBuffer)val3, clientId).TerminateDriver(tokenBuffer, tokenOffset, tokenLength); } } finally { if (val != null) { val.Dispose(); } } } return false; } public int SaveErrorLog(StreamWriter writer) { MappedByteBuffer val = MapExistingCncFile(null); try { return SaveErrorLog(writer, val); } finally { if (val != null) { val.Dispose(); } } } public int SaveErrorLog(StreamWriter writer, MappedByteBuffer cncByteBuffer) { if (cncByteBuffer == null) { return 0; } return PrintErrorLog(ErrorLogBuffer(cncByteBuffer), writer); } public static int PrintErrorLog(IAtomicBuffer errorBuffer, TextWriter @out) { //IL_001f: Unknown result type (might be due to invalid IL or missing references) //IL_0029: Expected O, but got Unknown int num = 0; if (ErrorLogReader.HasErrors(errorBuffer)) { num = ErrorLogReader.Read(errorBuffer, new ErrorConsumer(ErrorConsumer)); @out.WriteLine(); @out.WriteLine("{0} distinct errors observed.", num); } return num; void ErrorConsumer(int count, long firstTimestamp, long lastTimestamp, string encodedException) { DateTime dateTime = new DateTime(firstTimestamp); DateTime dateTime2 = new DateTime(lastTimestamp); @out.WriteLine(); @out.WriteLine($"{count} observations from {dateTime} to {dateTime2} for:"); @out.WriteLine(encodedException); } } private static IAtomicBuffer ErrorLogBuffer(MappedByteBuffer cncByteBuffer) { UnsafeBuffer val = CncFileDescriptor.CreateMetaDataBuffer(cncByteBuffer); CncFileDescriptor.CheckVersion(val.GetInt(CncFileDescriptor.CncVersionOffset(0))); return (IAtomicBuffer)(object)CncFileDescriptor.CreateErrorLogBuffer(cncByteBuffer, (IDirectBuffer)(object)val); } public static ErrorHandler SetupErrorHandler(ErrorHandler userErrorHandler, DistinctErrorLog errorLog) { //IL_000f: Unknown result type (might be due to invalid IL or missing references) //IL_0019: Expected O, but got Unknown //IL_003b: Unknown result type (might be due to invalid IL or missing references) //IL_0041: Expected O, but got Unknown //IL_002e: Unknown result type (might be due to invalid IL or missing references) //IL_0034: Expected O, but got Unknown LoggingErrorHandler loggingErrorHandler = new LoggingErrorHandler(errorLog); if (userErrorHandler == null) { LoggingErrorHandler obj = loggingErrorHandler; return new ErrorHandler(obj.OnError); } return (ErrorHandler)delegate(Exception throwable) { loggingErrorHandler.OnError(throwable); userErrorHandler.Invoke(throwable); }; } } public const int NULL_VALUE = -1; private readonly AtomicBoolean _isClosed = new AtomicBoolean(false); private readonly long _clientId; private readonly ClientConductor _conductor; private readonly IRingBuffer _commandBuffer; private readonly AgentInvoker _conductorInvoker; private readonly AgentRunner _conductorRunner; private readonly Context _ctx; public bool IsClosed => _isClosed.Get(); public Context Ctx => _ctx; public long ClientId => _clientId; public AgentInvoker ConductorAgentInvoker => _conductorInvoker; public CountersReader CountersReader { get { if (_isClosed.Get()) { throw new AeronException("client is closed"); } return _conductor.CountersReader(); } } internal Aeron(Context ctx) { //IL_0002: Unknown result type (might be due to invalid IL or missing references) //IL_000c: Expected O, but got Unknown //IL_0089: Unknown result type (might be due to invalid IL or missing references) //IL_0093: Expected O, but got Unknown //IL_005b: Unknown result type (might be due to invalid IL or missing references) //IL_0065: Expected O, but got Unknown try { ctx.Conclude(); _ctx = ctx; _clientId = ctx.ClientId(); _commandBuffer = ctx.ToDriverBuffer(); _conductor = new ClientConductor(ctx, this); if (ctx.UseConductorAgentInvoker()) { _conductorInvoker = new AgentInvoker(ctx.ErrorHandler(), (AtomicCounter)null, (IAgent)(object)_conductor); _conductorRunner = null; } else { _conductorInvoker = null; _conductorRunner = new AgentRunner(ctx.IdleStrategy(), ctx.ErrorHandler(), (AtomicCounter)null, (IAgent)(object)_conductor); } } catch (ConcurrentConcludeException) { throw; } catch (Exception) { CloseHelper.QuietDispose((IDisposable)ctx); throw; } } public static Aeron Connect() { return Connect(new Context()); } public static Aeron Connect(Context ctx) { try { Aeron aeron = new Aeron(ctx); if (ctx.UseConductorAgentInvoker()) { aeron.ConductorAgentInvoker.Start(); } else { AgentRunner.StartOnThread(aeron._conductorRunner, ctx.ThreadFactory()); } return aeron; } catch (ConcurrentConcludeException) { throw; } catch (Exception) { ctx.Dispose(); throw; } } public void PrintCounters(StreamWriter @out) { //IL_001a: Unknown result type (might be due to invalid IL or missing references) //IL_0024: Expected O, but got Unknown CountersReader.ForEach((CounterConsumer)delegate(long value, int id, string label) { @out.WriteLine("{0,3}: {1:} - {2}", id, value, label); }); } public bool IsCommandActive(long correlationId) { return _conductor.IsCommandActive(correlationId); } public bool HasActiveCommands() { return _conductor.HasActiveCommands(); } public void Dispose() { if (_isClosed.CompareAndSet(false, true)) { ErrorHandler val = _ctx.ErrorHandler(); if (_conductorRunner != null) { CloseHelper.Dispose(val, (IDisposable)_conductorRunner); } else { CloseHelper.Dispose(val, (IDisposable)_conductorInvoker); } } } public Publication AddPublication(string channel, int streamId) { return _conductor.AddPublication(channel, streamId); } public ExclusivePublication AddExclusivePublication(string channel, int streamId) { return _conductor.AddExclusivePublication(channel, streamId); } public long AsyncAddPublication(string channel, int streamId) { return _conductor.AsyncAddPublication(channel, streamId); } public void AsyncRemovePublication(long registrationId) { _conductor.RemovePublication(registrationId); } public long AsyncAddExclusivePublication(string channel, int streamId) { return _conductor.AsyncAddExclusivePublication(channel, streamId); } public ConcurrentPublication GetPublication(long registrationId) { return _conductor.GetPublication(registrationId); } public ExclusivePublication GetExclusivePublication(long registrationId) { return _conductor.GetExclusivePublication(registrationId); } public Subscription AddSubscription(string channel, int streamId) { return _conductor.AddSubscription(channel, streamId); } public Subscription AddSubscription(string channel, int streamId, AvailableImageHandler availableImageHandler, UnavailableImageHandler unavailableImageHandler) { return _conductor.AddSubscription(channel, streamId, availableImageHandler, unavailableImageHandler); } public long NextCorrelationId() { if (_isClosed.Get()) { throw new AeronException("client is closed"); } return _commandBuffer.NextCorrelationId(); } public Counter AddCounter(int typeId, IDirectBuffer keyBuffer, int keyOffset, int keyLength, IDirectBuffer labelBuffer, int labelOffset, int labelLength) { return _conductor.AddCounter(typeId, keyBuffer, keyOffset, keyLength, labelBuffer, labelOffset, labelLength); } public Counter AddCounter(int typeId, string label) { return _conductor.AddCounter(typeId, label); } public long AddAvailableCounterHandler(AvailableCounterHandler handler) { return _conductor.AddAvailableCounterHandler(handler); } public bool RemoveAvailableCounterHandler(long registrationId) { return _conductor.RemoveAvailableCounterHandler(registrationId); } [Obsolete] public bool RemoveAvailableCounterHandler(AvailableCounterHandler handler) { return _conductor.RemoveAvailableCounterHandler(handler); } public long AddUnavailableCounterHandler(UnavailableCounterHandler handler) { return _conductor.AddUnavailableCounterHandler(handler); } public bool RemoveUnavailableCounterHandler(long registrationId) { return _conductor.RemoveUnavailableCounterHandler(registrationId); } [Obsolete] public bool RemoveUnavailableCounterHandler(UnavailableCounterHandler handler) { return _conductor.RemoveUnavailableCounterHandler(handler); } public long AddCloseHandler(Action handler) { return _conductor.AddCloseHandler(handler); } [Obsolete] public bool RemoveCloseHandler(Action handler) { return _conductor.RemoveCloseHandler(handler); } public bool RemoveCloseHandler(long registrationId) { return _conductor.RemoveCloseHandler(registrationId); } internal void InternalClose() { _isClosed.Set(true); } private static void Sleep(int durationMs) { try { Thread.Sleep(durationMs); } catch (ThreadInterruptedException innerException) { Thread.CurrentThread.Interrupt(); throw new AeronException("unexpected interrupt", innerException); } } } public static class AeronCounters { public const int DRIVER_SYSTEM_COUNTER_TYPE_ID = 0; public const int DRIVER_PUBLISHER_LIMIT_TYPE_ID = 1; public const int DRIVER_SENDER_POSITION_TYPE_ID = 2; public const int DRIVER_RECEIVER_HWM_TYPE_ID = 3; public const int DRIVER_SUBSCRIBER_POSITION_TYPE_ID = 4; public const int DRIVER_RECEIVER_POS_TYPE_ID = 5; public const int DRIVER_SEND_CHANNEL_STATUS_TYPE_ID = 6; public const int DRIVER_RECEIVE_CHANNEL_STATUS_TYPE_ID = 7; public const int DRIVER_SENDER_LIMIT_TYPE_ID = 9; public const int DRIVER_PER_IMAGE_TYPE_ID = 10; public const int DRIVER_HEARTBEAT_TYPE_ID = 11; public const int DRIVER_PUBLISHER_POS_TYPE_ID = 12; public const int DRIVER_SENDER_BPE_TYPE_ID = 13; public const int NAME_RESOLVER_NEIGHBORS_COUNTER_TYPE_ID = 15; public const int NAME_RESOLVER_CACHE_ENTRIES_COUNTER_TYPE_ID = 16; public const int DRIVER_LOCAL_SOCKET_ADDRESS_STATUS_TYPE_ID = 14; public const int FLOW_CONTROL_RECEIVERS_COUNTER_TYPE_ID = 17; public const int MDC_DESTINATIONS_COUNTER_TYPE_ID = 18; public const int ARCHIVE_RECORDING_POSITION_TYPE_ID = 100; public const int ARCHIVE_ERROR_COUNT_TYPE_ID = 101; public const int ARCHIVE_CONTROL_SESSIONS_TYPE_ID = 102; public const int ARCHIVE_MAX_CYCLE_TIME_TYPE_ID = 103; public const int ARCHIVE_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID = 104; public const int CLUSTER_CONSENSUS_MODULE_STATE_TYPE_ID = 200; public const int CLUSTER_NODE_ROLE_TYPE_ID = 201; public const int CLUSTER_CONTROL_TOGGLE_TYPE_ID = 202; public const int CLUSTER_COMMIT_POSITION_TYPE_ID = 203; public const int CLUSTER_RECOVERY_STATE_TYPE_ID = 204; public const int CLUSTER_SNAPSHOT_COUNTER_TYPE_ID = 205; public const int CLUSTER_ELECTION_STATE_TYPE_ID = 207; public const int CLUSTER_BACKUP_STATE_TYPE_ID = 208; public const int CLUSTER_BACKUP_LIVE_LOG_POSITION_TYPE_ID = 209; public const int CLUSTER_BACKUP_QUERY_DEADLINE_TYPE_ID = 210; public const int CLUSTER_BACKUP_ERROR_COUNT_TYPE_ID = 211; public const int CLUSTER_CONSENSUS_MODULE_ERROR_COUNT_TYPE_ID = 212; public const int CLUSTER_CLIENT_TIMEOUT_COUNT_TYPE_ID = 213; public const int CLUSTER_INVALID_REQUEST_COUNT_TYPE_ID = 214; public const int CLUSTER_CLUSTERED_SERVICE_ERROR_COUNT_TYPE_ID = 215; public const int CLUSTER_MAX_CYCLE_TIME_TYPE_ID = 216; public const int CLUSTER_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID = 217; public const int CLUSTER_CLUSTERED_SERVICE_MAX_CYCLE_TIME_TYPE_ID = 218; public const int CLUSTER_CLUSTERED_SERVICE_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID = 219; public const int CLUSTER_STANDBY_STATE_TYPE_ID = 220; public const int CLUSTER_STANDBY_ERROR_COUNT_TYPE_ID = 221; public const int CLUSTER_STANDBY_HEARTBEAT_RESPONSE_COUNT_TYPE_ID = 222; public static void ValidateCounterTypeId(CountersReader countersReader, int counterId, int expectedCounterTypeId) { int counterTypeId = countersReader.GetCounterTypeId(counterId); if (expectedCounterTypeId != counterTypeId) { throw new ConfigurationException("The type for counterId=" + counterId + ", typeId=" + counterTypeId + " does not match the expected=" + expectedCounterTypeId); } } public static void ValidateCounterTypeId(Aeron aeron, Counter counter, int expectedCounterTypeId) { ValidateCounterTypeId(aeron.CountersReader, ((AtomicCounter)counter).Id, expectedCounterTypeId); } } public class AeronThrowHelper { [MethodImpl(MethodImplOptions.NoInlining)] public static void ThrowAeronException(string message) { throw GetAeronException(message); } [MethodImpl(MethodImplOptions.NoInlining)] private static AeronException GetAeronException(string message) { return new AeronException(message); } } public delegate void AvailableCounterHandler(CountersReader countersReader, long registrationId, int counterId); public sealed class BufferBuilder { internal const int MAX_CAPACITY = 2147483639; internal const int INIT_MIN_CAPACITY = 4096; private readonly UnsafeBuffer _buffer; private int _limit; private int _nextTermOffset; public BufferBuilder() : this(0) { } public BufferBuilder(int initialCapacity) { //IL_0042: Unknown result type (might be due to invalid IL or missing references) //IL_004c: Expected O, but got Unknown if (initialCapacity < 0 || initialCapacity > 2147483639) { throw new ArgumentException("initialCapacity outside range 0 - " + 2147483639 + ": initialCapacity=" + initialCapacity); } _buffer = new UnsafeBuffer(new byte[initialCapacity]); } public int Capacity() { return _buffer.Capacity; } public int Limit() { return _limit; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Limit(int limit) { if (limit < 0 || limit >= _buffer.Capacity) { ThrowHelper.ThrowArgumentException($"limit outside range: capacity={_buffer.Capacity:D} limit={limit:D}"); } _limit = limit; } public int NextTermOffset() { return _nextTermOffset; } public void NextTermOffset(int offset) { _nextTermOffset = offset; } public IMutableDirectBuffer Buffer() { return (IMutableDirectBuffer)(object)_buffer; } public BufferBuilder Reset() { _limit = 0; _nextTermOffset = 0; return this; } public BufferBuilder Compact() { Resize(Math.Max(4096, _limit)); return this; } public BufferBuilder Append(IDirectBuffer srcBuffer, int srcOffset, int length) { EnsureCapacity(length); srcBuffer.GetBytes(srcOffset, (IMutableDirectBuffer)(object)_buffer, _limit, length); _limit += length; return this; } private void EnsureCapacity(int additionalLength) { long num = (long)_limit + (long)additionalLength; int capacity = _buffer.Capacity; if (num > capacity) { if (num > 2147483639) { throw new InvalidOperationException("insufficient capacity: maxCapacity=" + 2147483639 + " limit=" + _limit + " additionalLength=" + additionalLength); } Resize(FindSuitableCapacity(capacity, num)); } } private void Resize(int newCapacity) { _buffer.Wrap(CopyOf(_buffer.ByteArray, newCapacity)); } private static T[] CopyOf<T>(T[] original, int newLength) { T[] array = new T[newLength]; Array.Copy(original, 0, array, 0, Math.Min(original.Length, newLength)); return array; } internal static int FindSuitableCapacity(int capacity, long requiredCapacity) { long num = Math.Max(capacity, 4096); while (num < requiredCapacity) { num += num >> 1; if (num > 2147483639) { num = 2147483639L; } } return (int)num; } } public sealed class ChannelUri { private enum State { MEDIA, PARAMS_KEY, PARAMS_VALUE } public const string AERON_SCHEME = "aeron"; public const string SPY_QUALIFIER = "aeron-spy"; public const long INVALID_TAG = -1L; private const int CHANNEL_TAG_INDEX = 0; private const int ENTITY_TAG_INDEX = 1; private static readonly string AERON_PREFIX = "aeron:"; private string _prefix; private string _media; private readonly Map<string, string> _params; private readonly string[] _tags; public bool IsUdp => "udp".Equals(_media); public bool IsIpc => "ipc".Equals(_media); public ChannelUri(string prefix, string media, Map<string, string> @params) { _prefix = prefix; _media = media; _params = @params; _tags = SplitTags(_params.Get("tags")); } public string Prefix() { return _prefix; } public ChannelUri Prefix(string prefix) { _prefix = prefix; return this; } public string Media() { return _media; } public ChannelUri Media(string media) { ValidateMedia(media); _media = media; return this; } public string Scheme() { return "aeron"; } public string Get(string key) { return _params.Get(key); } public string Get(string key, string defaultValue) { string text = _params.Get(key); if (text != null) { return text; } return defaultValue; } public string Put(string key, string value) { return _params.Put(key, value); } public string Remove(string key) { return _params.Remove(key); } public bool ContainsKey(string key) { return _params.ContainsKey(key); } public string ChannelTag() { if (_tags == null || _tags.Length == 0) { return null; } return _tags[0]; } public string EntityTag() { if (_tags.Length <= 1) { return null; } return _tags[1]; } private bool Equals(ChannelUri other) { if (_prefix == other._prefix && _media == other._media && object.Equals(_params, other._params)) { return object.Equals(_tags, other._tags); } return false; } public override bool Equals(object obj) { if (this == obj) { return true; } if (!(obj is ChannelUri channelUri)) { return false; } if (object.Equals(_prefix, channelUri._prefix) && object.Equals(_media, channelUri._media) && object.Equals(_params, channelUri._params)) { return object.Equals(_tags, channelUri._tags); } return false; } public override int GetHashCode() { return (((((((_prefix != null) ? _prefix.GetHashCode() : 0) * 397) ^ ((_media != null) ? _media.GetHashCode() : 0)) * 397) ^ ((_params != null) ? ((object)_params).GetHashCode() : 0)) * 397) ^ ((_tags != null) ? _tags.GetHashCode() : 0); } public override string ToString() { StringBuilder stringBuilder; if (_prefix == null || "".Equals(_prefix)) { stringBuilder = new StringBuilder(_params.Count * 20 + 10); } else { stringBuilder = new StringBuilder(_params.Count * 20 + 20); stringBuilder.Append(_prefix); if (!_prefix.EndsWith(":")) { stringBuilder.Append(":"); } } stringBuilder.Append(AERON_PREFIX).Append(_media); if (_params.Count > 0) { stringBuilder.Append('?'); foreach (KeyValuePair<string, string> param in _params) { stringBuilder.Append(param.Key).Append('=').Append(param.Value) .Append('|'); } stringBuilder.Length--; } return stringBuilder.ToString(); } public void InitialPosition(long position, int initialTermId, int termLength) { if (position < 0 || (position & 0x1F) != 0L) { throw new ArgumentException("invalid position: " + position); } int positionBitsToShift = LogBufferDescriptor.PositionBitsToShift(termLength); int num = LogBufferDescriptor.ComputeTermIdFromPosition(position, positionBitsToShift, initialTermId); int num2 = (int)(position & (termLength - 1)); Put("init-term-id", Convert.ToString(initialTermId)); Put("term-id", Convert.ToString(num)); Put("term-offset", Convert.ToString(num2)); Put("term-length", Convert.ToString(termLength)); } public static ChannelUri Parse(string cs) { int num = 0; string prefix; if (StartsWith(cs, 0, "aeron-spy:")) { prefix = "aeron-spy"; num = "aeron-spy:".Length; } else { prefix = ""; } if (!StartsWith(cs, num, AERON_PREFIX)) { throw new ArgumentException("Aeron URIs must start with 'aeron:', found: " + cs); } num += AERON_PREFIX.Length; StringBuilder stringBuilder = new StringBuilder(); Map<string, string> val = new Map<string, string>((string)null); string media = null; string text = null; State state = State.MEDIA; int i = num; for (int length = cs.Length; i < length; i++) { char c = cs[i]; switch (state) { case State.MEDIA: switch (c) { case '?': media = stringBuilder.ToString(); stringBuilder.Length = 0; state = State.PARAMS_KEY; break; case ':': case '=': case '|': throw new ArgumentException("encountered '" + c + "' within media definition at index " + i + " in " + cs); default: stringBuilder.Append(c); break; } break; case State.PARAMS_KEY: switch (c) { case '=': if (stringBuilder.Length == 0) { throw new ArgumentException("empty key not allowed at index " + i + " in " + cs); } text = stringBuilder.ToString(); stringBuilder.Length = 0; state = State.PARAMS_VALUE; break; case '|': throw new ArgumentException("invalid end of key at index " + i + " in " + cs); default: stringBuilder.Append(c); break; } break; case State.PARAMS_VALUE: if (c == '|') { val.Put(text, stringBuilder.ToString()); stringBuilder.Length = 0; state = State.PARAMS_KEY; } else { stringBuilder.Append(c); } break; default: throw new ArgumentException("unexpected state=" + state.ToString() + " in " + cs); } } switch (state) { case State.MEDIA: media = stringBuilder.ToString(); ValidateMedia(media); break; case State.PARAMS_VALUE: val.Put(text, stringBuilder.ToString()); break; default: throw new ArgumentException("no more input found, state=" + state.ToString() + " in " + cs); } return new ChannelUri(prefix, media, val); } public static string AddSessionId(string channel, int sessionId) { ChannelUri channelUri = Parse(channel); channelUri.Put("session-id", Convert.ToString(sessionId)); return channelUri.ToString(); } public static bool IsTagged(string paramValue) { return StartsWith(paramValue, 0, "tag:"); } public static long GetTag(string paramValue) { if (!IsTagged(paramValue)) { return -1L; } return long.Parse(paramValue.Substring(4, paramValue.Length - 4)); } public static string CreateDestinationUri(string channel, string endpoint) { ChannelUri channelUri = Parse(channel); string text = AERON_PREFIX + channelUri.Media() + "?endpoint=" + endpoint; string text2 = channelUri.Get("interface"); if (text2 != null) { return text + "|interface=" + text2; } return text; } public void ReplaceEndpointWildcardPort(string resolvedEndpoint) { if (resolvedEndpoint == null) { throw new ArgumentNullException("resolvedEndpoint", "resolvedEndpoint is null"); } int num = resolvedEndpoint.LastIndexOf(':'); if (-1 == num) { throw new ArgumentException("No port specified on resolvedEndpoint=" + resolvedEndpoint); } if (resolvedEndpoint.EndsWith(":0", StringComparison.Ordinal)) { throw new ArgumentException("Wildcard port specified on resolvedEndpoint=" + resolvedEndpoint); } string text = Get("endpoint"); if (text == null) { Put("endpoint", resolvedEndpoint); } else if (text.EndsWith(":0", StringComparison.Ordinal)) { string value = text.Substring(0, text.Length - 2) + resolvedEndpoint.Substring(resolvedEndpoint.LastIndexOf(':')); Put("endpoint", value); } } private static void ValidateMedia(string media) { if ("ipc".Equals(media) || "udp".Equals(media)) { return; } throw new ArgumentException("unknown media: " + media); } private static bool StartsWith(string input, int position, string prefix) { if (input.Length - position < prefix.Length) { return false; } for (int i = 0; i < prefix.Length; i++) { if (input[position + i] != prefix[i]) { return false; } } return true; } private static string[] SplitTags(string tagsValue) { string[] array = ArrayUtil.EMPTY_STRING_ARRAY; if (tagsValue != null) { int num = CountTags(tagsValue); if (num == 1) { array = new string[1] { tagsValue }; } else { int num2 = 0; int num3 = 0; array = new string[num]; int i = 0; for (int length = tagsValue.Length; i < length; i++) { if (tagsValue[i] == ',') { array[num3++] = tagsValue.Substring(num2, i - num2); num2 = i + 1; if (num3 >= num - 1) { array[num3] = tagsValue.Substring(num2, length - num2); } } } } } return array; } private static int CountTags(string tags) { int num = 1; int i = 0; for (int length = tags.Length; i < length; i++) { if (tags[i] == ',') { num++; } } return num; } } public sealed class ChannelUriStringBuilder { public const string TAG_PREFIX = "tag:"; private readonly StringBuilder _sb = new StringBuilder(64); private string _prefix; private string _media; private string _endpoint; private string _networkInterface; private string _controlEndpoint; private string _controlMode; private string _tags; private string _alias; private string _cc; private string _fc; private bool? _reliable; private int? _ttl; private int? _mtu; private int? _termLength; private int? _initialTermId; private int? _termId; private int? _termOffset; private long? _sessionId; private long? _groupTag; private long? _linger; private bool? _sparse; private bool? _eos; private bool? _tether; private bool? _group; private bool? _rejoin; private bool? _ssc; private bool _isSessionIdTagged; private int? _socketSndbufLength; private int? _socketRcvbufLength; private int? _receiverWindowLength; private string _mediaReceiveTimestampOffset; private string _channelReceiveTimestampOffset; private string _channelSendTimestampOffset; public ChannelUriStringBuilder() { } public ChannelUriStringBuilder(string initialUri) : this(ChannelUri.Parse(initialUri)) { } public ChannelUriStringBuilder(ChannelUri channelUri) { _isSessionIdTagged = false; Prefix(channelUri); Media(channelUri); Endpoint(channelUri); NetworkInterface(channelUri); ControlEndpoint(channelUri); ControlMode(channelUri); Tags(channelUri); Alias(channelUri); CongestionControl(channelUri); FlowControl(channelUri); Reliable(channelUri); Ttl(channelUri); Mtu(channelUri); TermLength(channelUri); InitialTermId(channelUri); TermId(channelUri); TermOffset(channelUri); SessionId(channelUri); Group(channelUri); Linger(channelUri); Sparse(channelUri); Eos(channelUri); Tether(channelUri); GroupTag(channelUri); Rejoin(channelUri); SpiesSimulateConnection(channelUri); SocketRcvbufLength(channelUri); SocketSndbufLength(channelUri); ReceiverWindowLength(channelUri); MediaReceiveTimestampOffset(channelUri); ChannelReceiveTimestampOffset(channelUri); ChannelSendTimestampOffset(channelUri); } public ChannelUriStringBuilder Clear() { _prefix = null; _media = null; _endpoint = null; _networkInterface = null; _controlEndpoint = null; _controlMode = null; _tags = null; _alias = null; _cc = null; _fc = null; _reliable = null; _ttl = null; _mtu = null; _termLength = null; _initialTermId = null; _termId = null; _termOffset = null; _sessionId = null; _groupTag = null; _linger = null; _sparse = null; _eos = null; _tether = null; _group = null; _rejoin = null; _isSessionIdTagged = false; _socketRcvbufLength = null; _socketSndbufLength = null; _receiverWindowLength = null; _mediaReceiveTimestampOffset = null; _channelReceiveTimestampOffset = null; _channelSendTimestampOffset = null; return this; } public ChannelUriStringBuilder Validate() { if (_media == null) { throw new InvalidOperationException("media type is mandatory"); } if ("udp".Equals(_media) && _endpoint == null && _controlEndpoint == null) { throw new InvalidOperationException("either 'endpoint' or 'control' must be specified for UDP."); } int num = 0; num += (_initialTermId.HasValue ? 1 : 0); num += (_termId.HasValue ? 1 : 0); num += (_termOffset.HasValue ? 1 : 0); if (num > 0) { if (num < 3) { throw new ArgumentException("if any of then a complete set of 'initialTermId', 'termId', and 'termOffset' must be provided"); } if (_termId - _initialTermId < 0) { int? termId = _termId; string? text = termId.ToString(); termId = _initialTermId; throw new ArgumentException("difference greater than 2^31 - 1: termId=" + text + " - initialTermId=" + termId); } if (_termLength.HasValue && _termOffset > _termLength) { int? termOffset = _termOffset; string? text2 = termOffset.ToString(); termOffset = _termLength; throw new ArgumentException("termOffset=" + text2 + " > termLength=" + termOffset); } } return this; } public ChannelUriStringBuilder Prefix(string prefix) { if (prefix != null && !prefix.Equals("") && !prefix.Equals("aeron-spy")) { throw new ArgumentException("invalid prefix: " + prefix); } _prefix = prefix; return this; } public ChannelUriStringBuilder Prefix(ChannelUri channelUri) { return Prefix(channelUri.Prefix()); } public string Prefix() { return _prefix; } public ChannelUriStringBuilder Media(string media) { if (!(media == "udp") && !(media == "ipc")) { throw new ArgumentException("invalid media: " + media); } _media = media; return this; } public ChannelUriStringBuilder Media(ChannelUri channelUri) { return Media(channelUri.Media()); } public string Media() { return _media; } public ChannelUriStringBuilder Endpoint(string endpoint) { _endpoint = endpoint; return this; } public ChannelUriStringBuilder Endpoint(ChannelUri channelUri) { return Endpoint(channelUri.Get("endpoint")); } public string Endpoint() { return _endpoint; } public ChannelUriStringBuilder NetworkInterface(string networkInterface) { _networkInterface = networkInterface; return this; } public ChannelUriStringBuilder NetworkInterface(ChannelUri channelUri) { return NetworkInterface(channelUri.Get("interface")); } public string NetworkInterface() { return _networkInterface; } public ChannelUriStringBuilder ControlEndpoint(string controlEndpoint) { _controlEndpoint = controlEndpoint; return this; } public ChannelUriStringBuilder ControlEndpoint(ChannelUri channelUri) { return ControlEndpoint(channelUri.Get("control")); } public string ControlEndpoint() { return _controlEndpoint; } public ChannelUriStringBuilder ControlMode(string controlMode) { if (controlMode != null && !controlMode.Equals("manual") && !controlMode.Equals("dynamic")) { throw new ArgumentException("invalid control mode: " + controlMode); } _controlMode = controlMode; return this; } public ChannelUriStringBuilder ControlMode(ChannelUri channelUri) { return ControlMode(channelUri.Get("control-mode")); } public string ControlMode() { return _controlMode; } public ChannelUriStringBuilder Reliable(bool? isReliable) { _reliable = isReliable; return this; } public ChannelUriStringBuilder Reliable(ChannelUri channelUri) { string text = channelUri.Get("reliable"); if (text == null) { _reliable = null; return this; } return Reliable(Convert.ToBoolean(text)); } public bool? Reliable() { return _reliable; } public ChannelUriStringBuilder Ttl(int? ttl) { if (ttl.HasValue && (ttl < 0 || ttl > 255)) { int? num = ttl; throw new ArgumentException("TTL not in range 0-255: " + num); } _ttl = ttl; return this; } public ChannelUriStringBuilder Ttl(ChannelUri channelUri) { string text = channelUri.Get("ttl"); if (text == null) { _ttl = null; return this; } return Ttl(Convert.ToInt32(text)); } public int? Ttl() { return _ttl; } public ChannelUriStringBuilder Mtu(int? mtu) { if (mtu.HasValue) { if (mtu < 32 || mtu > 65504) { int? num = mtu; throw new ArgumentException("MTU not in range 32-65504: " + num); } if ((mtu & 0x1F) != 0) { int? num = mtu; throw new ArgumentException("MTU not a multiple of FRAME_ALIGNMENT: mtu=" + num); } } _mtu = mtu; return this; } public ChannelUriStringBuilder Mtu(ChannelUri channelUri) { string text = channelUri.Get("mtu"); if (text == null) { _mtu = null; return this; } long num = SystemUtil.ParseSize("mtu", text); if (num > int.MaxValue) { throw new InvalidOperationException("mtu " + num + " > " + int.MaxValue); } return Mtu((int)num); } public int? Mtu() { return _mtu; } public ChannelUriStringBuilder TermLength(int? termLength) { if (termLength.HasValue) { LogBufferDescriptor.CheckTermLength(termLength.Value); } _termLength = termLength; return this; } public ChannelUriStringBuilder TermLength(ChannelUri channelUri) { string text = channelUri.Get("term-length"); if (text == null) { _termLength = null; return this; } long num = SystemUtil.ParseSize("term-length", text); if (num > int.MaxValue) { string text2 = 1073741824.ToString(); int? termLength = _termLength; throw new InvalidOperationException("term length more than max length of " + text2 + ": length=" + termLength); } return TermLength((int)num); } public int? TermLength() { return _termLength; } public ChannelUriStringBuilder InitialTermId(int? initialTermId) { _initialTermId = initialTermId; return this; } public ChannelUriStringBuilder InitialTermId(ChannelUri channelUri) { string text = channelUri.Get("init-term-id"); if (text == null) { _initialTermId = null; return this; } return InitialTermId(Convert.ToInt32(text)); } public int? InitialTermId() { return _initialTermId; } public ChannelUriStringBuilder TermId(int? termId) { _termId = termId; return this; } public ChannelUriStringBuilder TermId(ChannelUri channelUri) { string text = channelUri.Get("term-id"); if (text == null) { _termId = null; return this; } return TermId(Convert.ToInt32(text)); } public int? TermId() { return _termId; } public ChannelUriStringBuilder TermOffset(int? termOffset) { if (termOffset.HasValue) { if (termOffset < 0 || termOffset > 1073741824) { int? num = termOffset; throw new ArgumentException("term offset not in range 0-1g: " + num); } if (0 != (termOffset & 0x1F)) { int? num = termOffset; throw new ArgumentException("term offset not multiple of FRAME_ALIGNMENT: " + num); } } _termOffset = termOffset; return this; } public ChannelUriStringBuilder TermOffset(ChannelUri channelUri) { string text = channelUri.Get("term-offset"); if (text == null) { _termOffset = null; return this; } return TermOffset(Convert.ToInt32(text)); } public int? TermOffset() { return _termOffset; } public ChannelUriStringBuilder SessionId(int? sessionId) { _sessionId = sessionId; return this; } public ChannelUriStringBuilder SessionId(string sessionIdStr) { if (sessionIdStr != null) { if (ChannelUri.IsTagged(sessionIdStr)) { TaggedSessionId(ChannelUri.GetTag(sessionIdStr)); } else { IsSessionIdTagged(isSessionIdTagged: false); SessionId(Convert.ToInt32(sessionIdStr)); } } else { SessionId((int?)null); } return this; } public ChannelUriStringBuilder TaggedSessionId(long? sessionId) { IsSessionIdTagged(isSessionIdTagged: true); _sessionId = sessionId; return this; } public ChannelUriStringBuilder SessionId(ChannelUri channelUri) { return SessionId(channelUri.Get("session-id")); } [Obsolete("this method will not correctly handle tagged sessionId values that are outside the range of")] public int? SessionId() { return (int?)_sessionId; } public ChannelUriStringBuilder Linger(long? lingerNs) { if (lingerNs.HasValue && lingerNs < 0) { long? num = lingerNs; throw new ArgumentException("linger value cannot be negative: " + num); } _linger = lingerNs; return this; } public ChannelUriStringBuilder Linger(ChannelUri channelUri) { string text = channelUri.Get("linger"); if (text == null) { _linger = null; return this; } return Linger(SystemUtil.ParseDuration("linger", text)); } public long? Linger() { return _linger; } public ChannelUriStringBuilder Sparse(bool? isSparse) { _sparse = isSparse; return this; } public ChannelUriStringBuilder Sparse(ChannelUri channelUri) { string text = channelUri.Get("sparse"); if (text == null) { _sparse = null; return this; } return Sparse(Convert.ToBoolean(text)); } public bool? Sparse() { return _sparse; } public ChannelUriStringBuilder Eos(bool? eos) { _eos = eos; return this; } public ChannelUriStringBuilder Eos(ChannelUri channelUri) { string text = channelUri.Get("eos"); if (text == null) { _eos = null; return this; } return Eos(Convert.ToBoolean(text)); } public bool? Eos() { return _eos; } public ChannelUriStringBuilder Tether(bool? tether) { _tether = tether; return this; } public ChannelUriStringBuilder Tether(ChannelUri channelUri) { string text = channelUri.Get("tether"); if (text == null) { _tether = null; return this; } return Tether(Convert.ToBoolean(text)); } public bool? Tether() { return _tether; } public ChannelUriStringBuilder Group(bool? group) { _group = group; return this; } public ChannelUriStringBuilder Group(ChannelUri channelUri) { string text = channelUri.Get("group"); if (text == null) { _group = null; return this; } return Group(Convert.ToBoolean(text)); } public bool? Group() { return _group; } public ChannelUriStringBuilder Tags(string tags) { _tags = tags; return this; } public ChannelUriStringBuilder Tags(ChannelUri channelUri) { return Tags(channelUri.Get("tags")); } public ChannelUriStringBuilder Tags(long? channelTag, long? pubSubTag) { if (!channelTag.HasValue && pubSubTag.HasValue) { throw new ArgumentException("null == channelTag && null != pubSubTag"); } if (!channelTag.HasValue) { return Tags((string)null); } long? num = channelTag; string? text = num.ToString(); object obj; if (!pubSubTag.HasValue) { obj = ""; } else { num = pubSubTag; obj = "," + num; } return Tags(text + (string?)obj); } public string Tags() { return _tags; } public ChannelUriStringBuilder IsSessionIdTagged(bool isSessionIdTagged) { _isSessionIdTagged = isSessionIdTagged; return this; } public bool IsSessionIdTagged() { return _isSessionIdTagged; } public ChannelUriStringBuilder Alias(string alias) { _alias = alias; return this; } public ChannelUriStringBuilder Alias(ChannelUri channelUri) { return Alias(channelUri.Get("alias")); } public string Alias() { return _alias; } public ChannelUriStringBuilder CongestionControl(string congestionControl) { _cc = congestionControl; return this; } public ChannelUriStringBuilder CongestionControl(ChannelUri channelUri) { return CongestionControl(channelUri.Get("cc")); } public string CongestionControl() { return _cc; } public ChannelUriStringBuilder FlowControl(string flowControl) { _fc = flowControl; return this; } public ChannelUriStringBuilder TaggedFlowControl(long? groupTag, int? minGroupSize, string timeout) { string text = "tagged"; if (groupTag.HasValue || minGroupSize.HasValue) { text += ",g:"; if (groupTag.HasValue) { string text2 = text; long? num = groupTag; text = text2 + num; } if (minGroupSize.HasValue) { string text3 = text; int? num2 = minGroupSize; text = text3 + "/" + num2; } } if (timeout != null) { text = text + ",t:" + timeout; } return FlowControl(text); } public ChannelUriStringBuilder MinFlowControl(int? minGroupSize, string timeout) { string text = "min"; if (minGroupSize.HasValue) { string text2 = text; int? num = minGroupSize; text = text2 + ",g:/" + num; } if (timeout != null) { text = text + ",t:" + timeout; } return FlowControl(text); } public ChannelUriStringBuilder FlowControl(ChannelUri channelUri) { return FlowControl(channelUri.Get("fc")); } public string FlowControl() { return _fc; } public ChannelUriStringBuilder GroupTag(long? groupTag) { _groupTag = groupTag; return this; } public ChannelUriStringBuilder GroupTag(ChannelUri channelUri) { string text = channelUri.Get("gtag"); if (text == null) { _groupTag = null; return this; } return GroupTag(Convert.ToInt64(text)); } public long? GroupTag() { return _groupTag; } public ChannelUriStringBuilder Rejoin(bool? rejoin) { _rejoin = rejoin; return this; } public ChannelUriStringBuilder Rejoin(ChannelUri channelUri) { string text = channelUri.Get("rejoin"); if (text == null) { _rejoin = null; return this; } return Rejoin(Convert.ToBoolean(text)); } public bool? Rejoin() { return _rejoin; } public ChannelUriStringBuilder SpiesSimulateConnection(bool? spiesSimulateConnection) { _ssc = spiesSimulateConnection; return this; } public ChannelUriStringBuilder SpiesSimulateConnection(ChannelUri channelUri) { string text = channelUri.Get("ssc"); if (text == null) { _ssc = null; return this; } return SpiesSimulateConnection(Convert.ToBoolean(text)); } public bool? SpiesSimulateConnection() { return _ssc; } public ChannelUriStringBuilder InitialPosition(long position, int initialTermId, int termLength) { if (position < 0) { throw new ArgumentException("invalid position=" + position + " < 0"); } if ((position & 0x1F) != 0L) { throw new ArgumentException("invalid position=" + position + " does not have frame alignment=" + 32); } int positionBitsToShift = LogBufferDescriptor.PositionBitsToShift(termLength); _initialTermId = initialTermId; _termId = LogBufferDescriptor.ComputeTermIdFromPosition(position, positionBitsToShift, initialTermId); _termOffset = (int)(position & (termLength - 1)); _termLength = termLength; return this; } public ChannelUriStringBuilder SocketSndbufLength(int? socketSndbufLength) { _socketSndbufLength = socketSndbufLength; return this; } public ChannelUriStringBuilder SocketSndbufLength(ChannelUri channelUri) { string text = channelUri.Get("so-sndbuf"); if (text == null) { _socketSndbufLength = null; return this; } long num = SystemUtil.ParseSize("so-sndbuf", text); if (num > int.MaxValue) { throw new InvalidOperationException("value exceeds maximum permitted: value=" + num); } return SocketSndbufLength((int)num); } public int? SocketSndbufLength() { return _socketSndbufLength; } public ChannelUriStringBuilder SocketRcvbufLength(int? socketRcvbufLength) { _socketRcvbufLength = socketRcvbufLength; return this; } public ChannelUriStringBuilder SocketRcvbufLength(ChannelUri channelUri) { string text = channelUri.Get("so-rcvbuf"); if (text == null) { _socketRcvbufLength = null; return this; } long num = SystemUtil.ParseSize("so-rcvbuf", text); if (num > int.MaxValue) { throw new InvalidOperationException("value exceeds maximum permitted: value=" + num); } return SocketRcvbufLength((int)num); } public int? SocketRcvbufLength() { return _socketRcvbufLength; } public ChannelUriStringBuilder ReceiverWindowLength(int? receiverWindowLength) { _receiverWindowLength = receiverWindowLength; return this; } public ChannelUriStringBuilder ReceiverWindowLength(ChannelUri channelUri) { string text = channelUri.Get("rcv-wnd"); if (text == null) { _receiverWindowLength = null; return this; } long num = SystemUtil.ParseSize("rcv-wnd", text); if (num > int.MaxValue) { throw new InvalidOperationException("value exceeds maximum permitted: value=" + num); } return ReceiverWindowLength((int)num); } public int? ReceiverWindowLength() { return _receiverWindowLength; } public string MediaReceiveTimestampOffset() { return _mediaReceiveTimestampOffset; } public ChannelUriStringBuilder MediaReceiveTimestampOffset(string timestampOffset) { if (timestampOffset != null && !"reserved".Equals(timestampOffset)) { try { int.Parse(timestampOffset); } catch (FormatException) { throw new ArgumentException("mediaReceiveTimestampOffset must be a number or the value 'reserved'"); } } _mediaReceiveTimestampOffset = timestampOffset; return this; } public ChannelUriStringBuilder MediaReceiveTimestampOffset(ChannelUri channelUri) { return MediaReceiveTimestampOffset(channelUri.Get("media-rcv-ts-offset")); } public string ChannelReceiveTimestampOffset() { return _channelReceiveTimestampOffset; } public ChannelUriStringBuilder ChannelReceiveTimestampOffset(string timestampOffset) { if (timestampOffset != null && !"reserved".Equals(timestampOffset)) { try { int.Parse(timestampOffset); } catch (FormatException) { throw new ArgumentException("channelReceiveTimestampOffset must be a number or the value 'reserved'"); } } _channelReceiveTimestampOffset = timestampOffset; return this; } public ChannelUriStringBuilder ChannelReceiveTimestampOffset(ChannelUri channelUri) { return ChannelReceiveTimestampOffset(channelUri.Get("channel-rcv-ts-offset")); } public string ChannelSendTimestampOffset() { return _channelSendTimestampOffset; } public ChannelUriStringBuilder ChannelSendTimestampOffset(string timestampOffset) { if (timestampOffset != null && !"reserved".Equals(timestampOffset)) { try { int.Parse(timestampOffset); } catch (FormatException) { throw new ArgumentException("channelSendTimestampOffset must be a number or the value 'reserved' found: " + timestampOffset); } } _channelSendTimestampOffset = timestampOffset; return this; } public ChannelUriStringBuilder ChannelSendTimestampOffset(ChannelUri channelUri) { return ChannelSendTimestampOffset(channelUri.Get("channel-snd-ts-offset")); } public string Build() { _sb.Length = 0; if (_prefix != null && !"".Equals(_prefix)) { _sb.Append(_prefix).Append(':'); } _sb.Append("aeron").Append(':').Append(_media) .Append('?'); if (_tags != null) { _sb.Append("tags").Append('=').Append(_tags) .Append('|'); } if (_endpoint != null) { _sb.Append("endpoint").Append('=').Append(_endpoint) .Append('|'); } if (_networkInterface != null) { _sb.Append("interface").Append('=').Append(_networkInterface) .Append('|'); } if (_controlEndpoint != null) { _sb.Append("control").Append('=').Append(_controlEndpoint) .Append('|'); } if (_controlMode != null) { _sb.Append("control-mode").Append('=').Append(_controlMode) .Append('|'); } if (_mtu.HasValue) { _sb.Append("mtu").Append('=').Append(_mtu.Value) .Append('|'); } if (_termLength.HasValue) { _sb.Append("term-length").Append('=').Append(_termLength.Value) .Append('|'); } if (_initialTermId.HasValue) { _sb.Append("init-term-id").Append('=').Append(_initialTermId.Value) .Append('|'); } if (_termId.HasValue) { _sb.Append("term-id").Append('=').Append(_termId.Value) .Append('|'); } if (_termOffset.HasValue) { _sb.Append("term-offset").Append('=').Append(_termOffset.Value) .Append('|'); } if (_sessionId.HasValue) { _sb.Append("session-id").Append('=').Append(PrefixTag(_isSessionIdTagged, _sessionId.Value)) .Append('|'); } if (_ttl.HasValue) { _sb.Append("ttl").Append('=').Append(_ttl.Value) .Append('|'); } if (_reliable.HasValue) { _sb.Append("reliable").Append('=').Append(_reliable) .Append('|'); } if (_linger.HasValue) { _sb.Append("linger").Append('=').Append(_linger.Value) .Append('|'); } if (_alias != null) { _sb.Append("alias").Append('=').Append(_alias) .Append('|'); } if (_cc != null) { _sb.Append("cc").Append('=').Append(_cc) .Append('|'); } if (_fc != null) { _sb.Append("fc").Append('=').Append(_fc) .Append('|'); } if (_groupTag.HasValue) { _sb.Append("gtag").Append('=').Append(_groupTag) .Append('|'); } if (_sparse.HasValue) { _sb.Append("sparse").Append('=').Append(_sparse) .Append('|'); } if (_eos.HasValue) { _sb.Append("eos").Append('=').Append(_eos) .Append('|'); } if (_tether.HasValue) { _sb.Append("tether").Append('=').Append(_tether) .Append('|'); } if (_group.HasValue) { _sb.Append("group").Append('=').Append(_group) .Append('|'); } if (_rejoin.HasValue) { _sb.Append("rejoin").Append('=').Append(_rejoin) .Append('|'); } if (_ssc.HasValue) { _sb.Append("ssc").Append('=').Append(_ssc) .Append('|'); } if (_socketSndbufLength.HasValue) { _sb.Append("so-sndbuf").Append('=').Append(_socketSndbufLength) .Append('|'); } if (_socketRcvbufLength.HasValue) { _sb.Append("so-rcvbuf").Append('=').Append(_socketRcvbufLength) .Append('|'); } if (_receiverWindowLength.HasValue) { _sb.Append("rcv-wnd").Append('=').Append(_receiverWindowLength) .Append('|'); } if (_mediaReceiveTimestampOffset != null) { _sb.Append("media-rcv-ts-offset").Append('=').Append(_mediaReceiveTimestampOffset) .Append('|'); } if (_channelReceiveTimestampOffset != null) { _sb.Append("channel-rcv-ts-offset").Append('=').Append(_channelReceiveTimestampOffset) .Append('|'); } if (_channelSendTimestampOffset != null) { _sb.Append("channel-snd-ts-offset").Append('=').Append(_channelSendTimestampOffset) .Append('|'); } char c = _sb[_sb.Length - 1]; if (c == '|' || c == '?') { _sb.Length -= 1; } return _sb.ToString(); } public override string ToString() { return Build(); } private static string PrefixTag(bool isTagged, long? value) { if (!isTagged) { return value.ToString(); } long? num = value; return "tag:" + num; } } internal class ClientConductor : IAgent { private const long NO_CORRELATION_ID = -1L; private static readonly long EXPLICIT_CLOSE_LINGER_NS = 1000000000L; private readonly long _keepAliveIntervalNs; private readonly long _driverTimeoutMs; private readonly long _driverTimeoutNs; private readonly long _interServiceTimeoutNs; private long _timeOfLastKeepAliveNs; private long _timeOfLastServiceNs; private bool _isClosed; private bool _isInCallback; private bool _isTerminating; private RegistrationException _driverException; private readonly Aeron.Context _ctx; private readonly Aeron _aeron; private readonly ILock _clientLock; private readonly IEpochClock _epochClock; private readonly INanoClock _nanoClock; private readonly IIdleStrategy _awaitingIdleStrategy; private readonly DriverEventsAdapter _driverEventsAdapter; private readonly ILogBuffersFactory _logBuffersFactory; private readonly Map<long, LogBuffers> _logBuffersByIdMap = new Map<long, LogBuffers>((LogBuffers)null); private readonly List<LogBuffers> _lingeringLogBuffers = new List<LogBuffers>(); private readonly Map<long, object> _resourceByRegIdMap = new Map<long, object>((object)null); private readonly Map<long, string> _stashedChannelByRegistrationId = new Map<long, string>((string)null); private readonly HashSet<long> _asyncCommandIdSet = new HashSet<long>(); private readonly AvailableImageHandler _defaultAvailableImageHandler; private readonly UnavailableImageHandler _defaultUnavailableImageHandler; private readonly Map<long, AvailableCounterHandler> _availableCounterHandlers = new Map<long, AvailableCounterHandler>((AvailableCounterHandler)null); private readonly Map<long, UnavailableCounterHandler> _unavailableCounterHandlers = new Map<long, UnavailableCounterHandler>((UnavailableCounterHandler)null); private readonly Map<long, Action> _closeHandlersByIdMap = new Map<long, Action>((Action)null); private readonly DriverProxy _driverProxy; private readonly AgentInvoker _driverAgentInvoker; private readonly UnsafeBuffer _counterValuesBuffer; private readonly CountersReader _countersReader; private AtomicCounter _heartbeatTimestamp; internal ClientConductor(Aeron.Context ctx, Aeron aeron) { //IL_0151: Unknown result type (might be due to invalid IL or missing references) //IL_015b: Expected O, but got Unknown _ctx = ctx; _aeron = aeron; _clientLock = ctx.ClientLock(); _epochClock = ctx.EpochClock(); _nanoClock = ctx.NanoClock(); _awaitingIdleStrategy = ctx.AwaitingIdleStrategy(); _driverProxy = ctx.DriverProxy(); _logBuffersFactory = ctx.LogBuffersFactory(); _keepAliveIntervalNs = ctx.KeepAliveIntervalNs(); _driverTimeoutMs = ctx.DriverTimeoutMs(); _driverTimeoutNs = _driverTimeoutMs * 1000000; _interServiceTimeoutNs = ctx.InterServiceTimeoutNs(); _defaultAvailableImageHandler = ctx.AvailableImageHandler(); _defaultUnavailableImageHandler = ctx.UnavailableImageHandler(); _driverEventsAdapter = new DriverEventsAdapter(ctx.ClientId(), ctx.ToClientBuffer(), this, _asyncCommandIdSet); _driverAgentInvoker = ctx.DriverAgentInvoker(); _counterValuesBuffer = ctx.CountersValuesBuffer(); _countersReader = new CountersReader((IAtomicBuffer)(object)ctx.CountersMetaDataBuffer(), (IAtomicBuffer)(object)ctx.CountersValuesBuffer(), Encoding.ASCII); if (ctx.AvailableCounterHandler() != null) { _availableCounterHandlers.Put(aeron.NextCorrelationId(), ctx.AvailableCounterHandler()); } if (ctx.UnavailableCounterHandler() != null) { _unavailableCounterHandlers.Put(aeron.NextCorrelationId(), ctx.UnavailableCounterHandler()); } if (ctx.CloseHandler() != null) { _closeHandlersByIdMap.Put(aeron.NextCorrelationId(), ctx.CloseHandler()); } _timeOfLastServiceNs = (_timeOfLastKeepAliveNs = _nanoClock.NanoTime()); } public void OnStart() { } public void OnClose() { bool flag = false; _clientLock.Lock(); try { if (_isClosed) { return; } if (!_aeron.IsClosed) { _aeron.InternalClose(); } bool isTerminating = _isTerminating; _isTerminating = true; ForceCloseResources(); NotifyCloseHandlers(); try { if (isTerminating) { Thread.Sleep(Aeron.Configuration.IdleSleepMs); } Thread.Sleep((int)TimeUnit.NANOSECONDS.ToMillis(_ctx.CloseLingerDurationNs())); } catch (ThreadInterruptedException) { flag = true; } foreach (LogBuffers lingeringLogBuffer in _lingeringLogBuffers) { CloseHelper.Dispose(_ctx.ErrorHandler(), (IDisposable)lingeringLogBuffer); } _driverProxy.ClientClose(); _ctx.Dispose(); _ctx.CountersMetaDataBuffer().Wrap(0, 0); _ctx.CountersValuesBuffer().Wrap(0, 0); } finally { _isClosed = true; if (flag) { Thread.CurrentThread.Interrupt(); } _clientLock.Unlock(); } } public int DoWork() { //IL_0017: Unknown result type (might be due to invalid IL or missing references) int result = 0; if (_clientLock.TryLock()) { try { if (_isTerminating) { throw new AgentTerminationException(); } result = Service(-1L); } finally { _clientLock.Unlock(); } } return result; } public string RoleName() { return "aeron-client-conductor"; } internal bool IsClosed() { return _isClosed; } internal bool IsTerminating() { return _isTerminating; } internal void OnError(long correlationId, int codeValue, ErrorCode errorCode, string message) { _driverException = new RegistrationException(correlationId, codeValue, errorCode, message); if (_resourceByRegIdMap.Get(correlationId) is Subscription subscription) { subscription.InternalClose(-1L); _resourceByRegIdMap.Remove(correlationId); } else if (_asyncCommandIdSet.Remove(correlationId)) { _stashedChannelByRegistrationId.Remove(correlationId); HandleError(new RegistrationException(correlationId, codeValue, errorCode, message)); } } internal void OnAsyncError(long correlationId, int codeValue, ErrorCode errorCode, string message) { _stashedChannelByRegistrationId.Remove(correlationId); HandleError(new RegistrationException(correlationId, codeValue, errorCode, message)); } internal void OnChannelEndpointError(long correlationId, string message) { int num = (int)correlationId; foreach (object value in _resourceByRegIdMap.Values) { if (value is Subscription subscription) { if (subscription.ChannelStatusId == num) { HandleError(new ChannelEndpointException(num, message)); } } else if (value is Publication publication && publication.ChannelStatusId == num) { HandleError(new ChannelEndpointException(num, message)); } } if (_asyncCommandIdSet.Remove(correlationId)) { _stashedChannelByRegistrationId.Remove(correlationId); HandleError(new RegistrationException(correlationId, 4, ErrorCode.CHANNEL_ENDPOINT_ERROR, message)); } } internal void OnNewPublication(long correlationId, long registrationId, int streamId, int sessionId, int publicationLimitId, int statusIndicatorId, string logFileName) { //IL_001a: Unknown result type (might be due to invalid IL or missing references) //IL_0032: Expected O, but got Unknown string channel = _stashedChannelByRegistrationId.Remove(correlationId); ConcurrentPublication concurrentPublication = new ConcurrentPublication(this, channel, streamId, sessionId, (IReadablePosition)new UnsafeBufferPosition(_counterValuesBuffer, publicationLimitId), statusIndicatorId, LogBuffers(registrationId, logFileName, channel), registrationId, correlationId); _resourceByRegIdMap.Put(correlationId, (object)concurrentPublication); } internal void OnNewExclusivePublication(long correlationId, long registrationId, int streamId, int sessionId, int publicationLimitId, int statusIndicatorId, string logFileName) { //IL_0046: Unknown result type (might be due to invalid IL or missing references) //IL_005e: Expected O, but got Unknown if (correlationId != registrationId) { HandleError(new InvalidOperationException("correlationId=" + correlationId + " registrationId=" + registrationId)); } string channel = _stashedChannelByRegistrationId.Remove(correlationId); ExclusivePublication exclusivePublication = new ExclusivePublication(this, channel, streamId, sessionId, (IReadablePosition)new UnsafeBufferPosition(_counterValuesBuffer, publicationLimitId), statusIndicatorId, LogBuffers(registrationId, logFileName, channel), registrationId, correlationId); _resourceByRegIdMap.Put(correlationId, (object)exclusivePublication); } internal void OnNewSubscription(long correlationId, int statusIndicatorId) { ((Subscription)_resourceByRegIdMap.Get(correlationId)).ChannelStatusId = statusIndicatorId; } internal void OnAvailableImage(long correlationId, int sessionId, long subscriptionRegistrationId, int subscriberPositionId, string logFileName, string sourceIdentity) { //IL_001f: Unknown result type (might be due to invalid IL or missing references) //IL_0046: Expected O, but got Unknown Subscription subscription = (Subscription)_resourceByRegIdMap.Get(subscriptionRegistrationId); if (subscription == null) { return; } Image image = new Image(subscription, sessionId, (IPosition)new UnsafeBufferPosition(_counterValuesBuffer, subscriberPositionId), LogBuffers(correlationId, logFileName, subscription.Channel), _ctx.SubscriberErrorHandler(), sourceIdentity, correlationId); AvailableImageHandler availableImageHandler = subscription.AvailableImageHandler; if (availableImageHandler != null) { _isInCallback = true; try { availableImageHandler(image); } catch (Exception ex) { HandleError(ex); } finally { _isInCallback = false; } } subscription.AddImage(image); } internal void OnUnavailableImage(long correlationId, long subscriptionRegistrationId) { Subscription subscription = (Subscription)_resourceByRegIdMap.Get(subscriptionRegistrationId); if (subscription == null) { return; } Image image = subscription.RemoveImage(correlationId); if (image != null) { UnavailableImageHandler unavailableImageHandler = subscription.UnavailableImageHandler; if (unavailableImageHandler != null) { NotifyImageUnavailable(unavailableImageHandler, image); } } } internal void OnNewCounter(long correlationId, int counterId) { _resourceByRegIdMap.Put(correlationId, (object)new Counter(correlationId, this, (IAtomicBuffer)(object)_counterValuesBuffer, counterId)); OnAvailableCounter(correlationId, counterId); } internal void OnAvailableCounter(long registrationId, int counterId) { foreach (AvailableCounterHandler value in _availableCounterHandlers.Values) { NotifyCounterAvailable(registrationId, counterId, value); } } internal void OnUnavailableCounter(long registrationId, int counterId) { NotifyUnavailableCounterHandlers(registrationId, counterId); } internal void OnClientTimeout() { if (!_isClosed) { _isTerminating = true; ForceCloseResources(); HandleError(new ClientTimeoutException("client timeout from driver")); } } internal CountersReader CountersReader() { return _countersReader; } internal void HandleError(Exception ex) { if (!_isClosed) { _ctx.ErrorHandler().Invoke(ex); } } internal ConcurrentPublication AddPublication(string channel, int streamId) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); long num = _driverProxy.AddPublication(channel, streamId); _stashedChannelByRegistrationId.Put(num, channel); AwaitResponse(num); return (ConcurrentPublication)_resourceByRegIdMap.Get(num); } finally { _clientLock.Unlock(); } } internal ExclusivePublication AddExclusivePublication(string channel, int streamId) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); long num = _driverProxy.AddExclusivePublication(channel, streamId); _stashedChannelByRegistrationId.Put(num, channel); AwaitResponse(num); return (ExclusivePublication)_resourceByRegIdMap.Get(num); } finally { _clientLock.Unlock(); } } internal long AsyncAddPublication(string channel, int streamId) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); long num = _driverProxy.AddPublication(channel, streamId); _stashedChannelByRegistrationId.Put(num, channel); _asyncCommandIdSet.Add(num); return num; } finally { _clientLock.Unlock(); } } internal long AsyncAddExclusivePublication(string channel, int streamId) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); long num = _driverProxy.AddExclusivePublication(channel, streamId); _stashedChannelByRegistrationId.Put(num, channel); _asyncCommandIdSet.Add(num); return num; } finally { _clientLock.Unlock(); } } internal ConcurrentPublication GetPublication(long registrationId) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); if (_asyncCommandIdSet.Contains(registrationId)) { Service(-1L); } return (ConcurrentPublication)_resourceByRegIdMap.Get(registrationId); } finally { _clientLock.Unlock(); } } internal ExclusivePublication GetExclusivePublication(long registrationId) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); if (_asyncCommandIdSet.Contains(registrationId)) { Service(-1L); } return (ExclusivePublication)_resourceByRegIdMap.Get(registrationId); } finally { _clientLock.Unlock(); } } internal void RemovePublication(Publication publication) { _clientLock.Lock(); try { if (!_isTerminating && !_isClosed && !publication.IsClosed) { EnsureNotReentrant(); publication.InternalClose(); if (publication == _resourceByRegIdMap.Remove(publication.RegistrationId)) { ReleaseLogBuffers(publication.LogBuffers, publication.OriginalRegistrationId, EXPLICIT_CLOSE_LINGER_NS); _asyncCommandIdSet.Add(_driverProxy.RemovePublication(publication.RegistrationId)); } } } finally { _clientLock.Unlock(); } } internal void RemovePublication(long publicationRegistrationId) { _clientLock.Lock(); try { if (-1 != publicationRegistrationId && !_isTerminating && !_isClosed) { EnsureNotReentrant(); object obj = _resourceByRegIdMap.Get(publicationRegistrationId); if (obj != null && !(obj is Publication)) { throw new AeronException("registration id is not a Publication"); } Publication publication = (Publication)obj; if (publication != null) { _resourceByRegIdMap.Remove(publicationRegistrationId); publication.InternalClose(); ReleaseLogBuffers(publication.LogBuffers, publication.OriginalRegistrationId, EXPLICIT_CLOSE_LINGER_NS); } if (_asyncCommandIdSet.Remove(publicationRegistrationId) || publication != null) { _driverProxy.RemovePublication(publicationRegistrationId); _stashedChannelByRegistrationId.Remove(publicationRegistrationId); } } } finally { _clientLock.Unlock(); } } internal Subscription AddSubscription(string channel, int streamId) { return AddSubscription(channel, streamId, _defaultAvailableImageHandler, _defaultUnavailableImageHandler); } internal Subscription AddSubscription(string channel, int streamId, AvailableImageHandler availableImageHandler, UnavailableImageHandler unavailableImageHandler) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); long num = _driverProxy.AddSubscription(channel, streamId); Subscription subscription = new Subscription(this, channel, streamId, num, availableImageHandler, unavailableImageHandler); _resourceByRegIdMap.Put(num, (object)subscription); AwaitResponse(num); return subscription; } finally { _clientLock.Unlock(); } } internal void RemoveSubscription(Subscription subscription) { _clientLock.Lock(); try { if (!_isTerminating && !_isClosed && !subscription.IsClosed) { EnsureNotReentrant(); subscription.InternalClose(EXPLICIT_CLOSE_LINGER_NS); long registrationId = subscription.RegistrationId; if (subscription == _resourceByRegIdMap.Remove(registrationId)) { _asyncCommandIdSet.Add(_driverProxy.RemoveSubscription(registrationId)); } } } finally { _clientLock.Unlock(); } } internal void AddDestination(long registrationId, string endpointChannel) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); AwaitResponse(_driverProxy.AddDestination(registrationId, endpointChannel)); } finally { _clientLock.Unlock(); } } internal void RemoveDestination(long registrationId, string endpointChannel) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); AwaitResponse(_driverProxy.RemoveDestination(registrationId, endpointChannel)); } finally { _clientLock.Unlock(); } } internal void AddRcvDestination(long registrationId, string endpointChannel) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); AwaitResponse(_driverProxy.AddRcvDestination(registrationId, endpointChannel)); } finally { _clientLock.Unlock(); } } internal void RemoveRcvDestination(long registrationId, string endpointChannel) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); AwaitResponse(_driverProxy.RemoveRcvDestination(registrationId, endpointChannel)); } finally { _clientLock.Unlock(); } } internal long AsyncAddDestination(long registrationId, string endpointChannel) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); long num = _driverProxy.AddDestination(registrationId, endpointChannel); _asyncCommandIdSet.Add(num); return num; } finally { _clientLock.Unlock(); } } internal long AsyncRemoveDestination(long registrationId, string endpointChannel) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); long num = _driverProxy.RemoveDestination(registrationId, endpointChannel); _asyncCommandIdSet.Add(num); return num; } finally { _clientLock.Unlock(); } } internal long AsyncAddRcvDestination(long registrationId, string endpointChannel) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); long num = _driverProxy.AddRcvDestination(registrationId, endpointChannel); _asyncCommandIdSet.Add(num); return num; } finally { _clientLock.Unlock(); } } internal long AsyncRemoveRcvDestination(long registrationId, string endpointChannel) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); long num = _driverProxy.RemoveRcvDestination(registrationId, endpointChan