Please disclose if any significant portion of your mod was created using AI tools by adding the 'AI Generated' category. Failing to do so may result in the mod being removed from Thunderstore.
Decompiled source of Aeron Client v1.40.0
BepInEx/core/Aeron.Client/netstandard2.0/Adaptive.Aeron.dll
Decompiled 11 months ago
The result has been truncated due to the large size, download it to view full contents!
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Diagnostics; using System.IO; using System.IO.MemoryMappedFiles; using System.Linq; using System.Reflection; using System.Runtime.CompilerServices; using System.Runtime.Serialization; using System.Runtime.Versioning; using System.Text; using System.Threading; using Adaptive.Aeron.Command; using Adaptive.Aeron.Exceptions; using Adaptive.Aeron.LogBuffer; using Adaptive.Aeron.Status; using Adaptive.Agrona; using Adaptive.Agrona.Collections; using Adaptive.Agrona.Concurrent; using Adaptive.Agrona.Concurrent.Broadcast; using Adaptive.Agrona.Concurrent.Errors; using Adaptive.Agrona.Concurrent.RingBuffer; using Adaptive.Agrona.Concurrent.Status; using Adaptive.Agrona.Util; [assembly: CompilationRelaxations(8)] [assembly: RuntimeCompatibility(WrapNonExceptionThrows = true)] [assembly: Debuggable(DebuggableAttribute.DebuggingModes.IgnoreSymbolStoreSequencePoints)] [assembly: InternalsVisibleTo("Adaptive.Aeron.Tests")] [assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] [assembly: TargetFramework(".NETStandard,Version=v2.0", FrameworkDisplayName = "")] [assembly: AssemblyCompany("Adaptive Financial Consulting Ltd.")] [assembly: AssemblyConfiguration("Release")] [assembly: AssemblyCopyright("Copyright 2023")] [assembly: AssemblyDescription("Efficient reliable UDP unicast, UDP multicast, and IPC transport protocol.")] [assembly: AssemblyFileVersion("1.40.0.0")] [assembly: AssemblyInformationalVersion("1.40.0")] [assembly: AssemblyProduct("Aeron Client")] [assembly: AssemblyTitle("Adaptive.Aeron")] [assembly: AssemblyVersion("1.40.0.0")] namespace Adaptive.Aeron { public class Aeron : IDisposable { public static class Configuration { public static readonly int IdleSleepMs = 16; public static readonly int AWAITING_IDLE_SLEEP_MS = 1; public static readonly long IdleSleepNs = NanoUtil.FromMilliseconds((long)IdleSleepMs); public static readonly long KeepaliveIntervalNs = NanoUtil.FromMilliseconds(500L); public const string RESOURCE_LINGER_DURATION_PROP_NAME = "aeron.client.resource.linger.duration"; public static readonly long RESOURCE_LINGER_DURATION_DEFAULT_NS = NanoUtil.FromSeconds(3L); public const string CLOSE_LINGER_DURATION_PROP_NAME = "aeron.client.close.linger.duration"; public const long CLOSE_LINGER_DURATION_DEFAULT_NS = 0L; public const string PRE_TOUCH_MAPPED_MEMORY_PROP_NAME = "aeron.pre.touch.mapped.memory"; public const bool PRE_TOUCH_MAPPED_MEMORY_DEFAULT = false; public static readonly ErrorHandler DEFAULT_ERROR_HANDLER = (ErrorHandler)delegate(Exception throwable) { lock (Console.Error) { Console.Error.WriteLine(throwable); } if (throwable is DriverTimeoutException) { Console.Error.WriteLine(); Console.Error.WriteLine("***"); Console.Error.WriteLine("*** Timeout for the Media Driver - is it currently running? exiting"); Console.Error.WriteLine("***"); Environment.Exit(-1); } }; public static long ResourceLingerDurationNs() { return Config.GetDurationInNanos("aeron.client.resource.linger.duration", RESOURCE_LINGER_DURATION_DEFAULT_NS); } public static long CloseLingerDurationNs() { return Config.GetDurationInNanos("aeron.client.close.linger.duration", 0L); } public static bool PreTouchMappedMemory() { string property = Config.GetProperty("aeron.pre.touch.mapped.memory"); if (property != null) { return bool.Parse(property); } return false; } } public class Context : IDisposable { private long _clientId; private bool _useConductorAgentInvoker; private bool _preTouchMappedMemory = Configuration.PreTouchMappedMemory(); private AgentInvoker _driverAgentInvoker; private ILock _clientLock; private IEpochClock _epochClock; private INanoClock _nanoClock; private IIdleStrategy _idleStrategy; private IIdleStrategy _awaitingIdleStrategy; private CopyBroadcastReceiver _toClientBuffer; private IRingBuffer _toDriverBuffer; private DriverProxy _driverProxy; private ILogBuffersFactory _logBuffersFactory; private ErrorHandler _errorHandler; private ErrorHandler _subscriberErrorHandler; private AvailableImageHandler _availableImageHandler; private UnavailableImageHandler _unavailableImageHandler; private AvailableCounterHandler _availableCounterHandler; private UnavailableCounterHandler _unavailableCounterHandler; private Action _closeHandler; private long _keepAliveIntervalNs = Configuration.KeepaliveIntervalNs; private long _interServiceTimeoutNs; private long _resourceLingerDurationNs = Configuration.ResourceLingerDurationNs(); private long _closeLingerDurationNs = Configuration.CloseLingerDurationNs(); private FileInfo _cncFile; private string _aeronDirectoryName = GetAeronDirectoryName(); private DirectoryInfo _aeronDirectory; private long _driverTimeoutMs = 10000L; private MappedByteBuffer _cncByteBuffer; private UnsafeBuffer _cncMetaDataBuffer; private UnsafeBuffer _countersMetaDataBuffer; private UnsafeBuffer _countersValuesBuffer; private IThreadFactory _threadFactory = (IThreadFactory)new DefaultThreadFactory(); private int _isConcluded; public const string AERON_DIR_PROP_NAME = "aeron.dir"; public static readonly string AERON_DIR_PROP_DEFAULT; public const string IPC_MEDIA = "ipc"; public const string UDP_MEDIA = "udp"; public const string IPC_CHANNEL = "aeron:ipc"; public const string UDP_CHANNEL = "aeron:udp"; public const string SPY_PREFIX = "aeron-spy:"; public const string ENDPOINT_PARAM_NAME = "endpoint"; public const string INTERFACE_PARAM_NAME = "interface"; public const string DEBUG_TIMEOUT_PROP_NAME = "aeron.debug.timeout"; public const long DRIVER_TIMEOUT_MS = 10000L; public const int NULL_SESSION_ID = -1; public const string INITIAL_TERM_ID_PARAM_NAME = "init-term-id"; public const string TERM_ID_PARAM_NAME = "term-id"; public const string TERM_OFFSET_PARAM_NAME = "term-offset"; public const string TERM_LENGTH_PARAM_NAME = "term-length"; public const string MTU_LENGTH_PARAM_NAME = "mtu"; public const string TTL_PARAM_NAME = "ttl"; public const string MDC_CONTROL_PARAM_NAME = "control"; public const string MDC_CONTROL_MODE_PARAM_NAME = "control-mode"; public const string MTU_LENGTH_URI_PARAM_NAME = "mtu"; public const string MDC_CONTROL_MODE = "control-mode"; public const string MDC_CONTROL_MODE_MANUAL = "manual"; public const string MDC_CONTROL_MODE_DYNAMIC = "dynamic"; public const string SESSION_ID_PARAM_NAME = "session-id"; public const string LINGER_PARAM_NAME = "linger"; public const string RELIABLE_STREAM_PARAM_NAME = "reliable"; public const string TAGS_PARAM_NAME = "tags"; public const string TAG_PREFIX = "tag:"; public const string SPARSE_PARAM_NAME = "sparse"; public const string ALIAS_PARAM_NAME = "alias"; public const string EOS_PARAM_NAME = "eos"; public const string TETHER_PARAM_NAME = "tether"; public const string GROUP_PARAM_NAME = "group"; public const string REJOIN_PARAM_NAME = "rejoin"; public const string CONGESTION_CONTROL_PARAM_NAME = "cc"; public const string FLOW_CONTROL_PARAM_NAME = "fc"; public const string GROUP_TAG_PARAM_NAME = "gtag"; public const string SPIES_SIMULATE_CONNECTION_PARAM_NAME = "ssc"; public const string SOCKET_SNDBUF_PARAM_NAME = "so-sndbuf"; public const string SOCKET_RCVBUF_PARAM_NAME = "so-rcvbuf"; public const string RECEIVER_WINDOW_LENGTH_PARAM_NAME = "rcv-wnd"; public const string MEDIA_RCV_TIMESTAMP_OFFSET_PARAM_NAME = "media-rcv-ts-offset"; public const string CHANNEL_RECEIVE_TIMESTAMP_OFFSET_PARAM_NAME = "channel-rcv-ts-offset"; public const string CHANNEL_SEND_TIMESTAMP_OFFSET_PARAM_NAME = "channel-snd-ts-offset"; public const string RESERVED_OFFSET = "reserved"; public const string FALLBACK_LOGGER_PROP_NAME = "aeron.fallback.logger"; private static readonly ConcurrentDictionary<string, bool> DebugFieldsSeen; public bool IsConcluded => 1 == _isConcluded; static Context() { DebugFieldsSeen = new ConcurrentDictionary<string, bool>(); string text = null; if (Environment.OSVersion.Platform == PlatformID.Unix && Directory.Exists("/dev/shm")) { text = "/dev/shm/aeron"; } if (text == null) { text = Path.Combine(Path.GetTempPath(), "aeron"); } AERON_DIR_PROP_DEFAULT = text + "-" + Environment.UserName; } public static TextWriter FallbackLogger() { string property = Config.GetProperty("aeron.fallback.logger", "stderr"); if (!(property == "stdout")) { if (!(property == "stderr")) { } return Console.Error; } return Console.Out; } public static string GetAeronDirectoryName() { return Config.GetProperty("aeron.dir", AERON_DIR_PROP_DEFAULT); } public Context ConcludeAeronDirectory() { if (_aeronDirectory == null) { _aeronDirectory = new DirectoryInfo(_aeronDirectoryName); } return this; } public Context Clone() { return (Context)MemberwiseClone(); } public Context Conclude() { //IL_0044: Unknown result type (might be due to invalid IL or missing references) //IL_004e: Expected O, but got Unknown //IL_00a4: Unknown result type (might be due to invalid IL or missing references) //IL_00ae: Expected O, but got Unknown //IL_00bc: Unknown result type (might be due to invalid IL or missing references) //IL_00c6: Expected O, but got Unknown //IL_0138: Unknown result type (might be due to invalid IL or missing references) //IL_0142: Expected O, but got Unknown //IL_015c: Unknown result type (might be due to invalid IL or missing references) //IL_0166: Expected O, but got Unknown //IL_0161: Unknown result type (might be due to invalid IL or missing references) //IL_016b: Expected O, but got Unknown if (Interlocked.Exchange(ref _isConcluded, 1) != 0) { throw new ConcurrentConcludeException(); } ConcludeAeronDirectory(); _cncFile = new FileInfo(Path.Combine(_aeronDirectory.FullName, "cnc.dat")); if (_clientLock == null) { _clientLock = (ILock)new ReentrantLock(); } else if (_clientLock is NoOpLock && !_useConductorAgentInvoker) { throw new AeronException("Must use Aeron.Context.UseConductorAgentInvoker(true) when Aeron.Context.ClientLock(...) is using a NoOpLock"); } if (_epochClock == null) { _epochClock = (IEpochClock)(object)SystemEpochClock.INSTANCE; } if (_nanoClock == null) { _nanoClock = (INanoClock)(object)SystemNanoClock.INSTANCE; } if (_idleStrategy == null) { _idleStrategy = (IIdleStrategy)new SleepingIdleStrategy(Configuration.IdleSleepMs); } if (_awaitingIdleStrategy == null) { _awaitingIdleStrategy = (IIdleStrategy)new SleepingIdleStrategy(Configuration.AWAITING_IDLE_SLEEP_MS); } if (CncFile() != null) { ConnectToDriver(); } _interServiceTimeoutNs = CncFileDescriptor.ClientLivenessTimeoutNs((IDirectBuffer)(object)_cncMetaDataBuffer); if (_interServiceTimeoutNs <= _keepAliveIntervalNs) { throw new ConfigurationException("interServiceTimeoutNs=" + _interServiceTimeoutNs + " <= keepAliveIntervalNs=" + _keepAliveIntervalNs); } if (_toDriverBuffer == null) { _toDriverBuffer = (IRingBuffer)new ManyToOneRingBuffer((IAtomicBuffer)(object)CncFileDescriptor.CreateToDriverBuffer(_cncByteBuffer, (IDirectBuffer)(object)_cncMetaDataBuffer)); } if (_toClientBuffer == null) { _toClientBuffer = new CopyBroadcastReceiver(new BroadcastReceiver((IAtomicBuffer)(object)CncFileDescriptor.CreateToClientsBuffer(_cncByteBuffer, (IDirectBuffer)(object)_cncMetaDataBuffer))); } if (CountersMetaDataBuffer() == null) { CountersMetaDataBuffer(CncFileDescriptor.CreateCountersMetaDataBuffer(_cncByteBuffer, (IDirectBuffer)(object)_cncMetaDataBuffer)); } if (CountersValuesBuffer() == null) { CountersValuesBuffer(CncFileDescriptor.CreateCountersValuesBuffer(_cncByteBuffer, (IDirectBuffer)(object)_cncMetaDataBuffer)); } if (_logBuffersFactory == null) { _logBuffersFactory = new MappedLogBuffersFactory(); } if (_errorHandler == null) { _errorHandler = Configuration.DEFAULT_ERROR_HANDLER; } if (_subscriberErrorHandler == null) { _subscriberErrorHandler = _errorHandler; } if (_availableImageHandler == null) { _availableImageHandler = delegate { }; } if (_unavailableImageHandler == null) { _unavailableImageHandler = delegate { }; } if (_driverProxy == null) { _clientId = _toDriverBuffer.NextCorrelationId(); _driverProxy = new DriverProxy(ToDriverBuffer(), _clientId); } return this; } public long ClientId() { return _clientId; } public FileInfo CncFile() { return _cncFile; } public Context UseConductorAgentInvoker(bool useConductorAgentInvoker) { _useConductorAgentInvoker = useConductorAgentInvoker; return this; } public bool UseConductorAgentInvoker() { return _useConductorAgentInvoker; } public Context PreTouchMappedMemory(bool preTouchMappedMemory) { _preTouchMappedMemory = preTouchMappedMemory; return this; } public bool PreTouchMappedMemory() { return _preTouchMappedMemory; } public Context DriverAgentInvoker(AgentInvoker driverAgentInvoker) { _driverAgentInvoker = driverAgentInvoker; return this; } public AgentInvoker DriverAgentInvoker() { return _driverAgentInvoker; } public Context ClientLock(ILock @lock) { _clientLock = @lock; return this; } public ILock ClientLock() { return _clientLock; } public Context EpochClock(IEpochClock clock) { _epochClock = clock; return this; } public IEpochClock EpochClock() { return _epochClock; } public Context NanoClock(INanoClock clock) { _nanoClock = clock; return this; } public INanoClock NanoClock() { return _nanoClock; } public Context IdleStrategy(IIdleStrategy idleStrategy) { _idleStrategy = idleStrategy; return this; } public IIdleStrategy IdleStrategy() { return _idleStrategy; } public Context AwaitingIdleStrategy(IIdleStrategy idleStrategy) { _awaitingIdleStrategy = idleStrategy; return this; } public IIdleStrategy AwaitingIdleStrategy() { return _awaitingIdleStrategy; } internal Context ToClientBuffer(CopyBroadcastReceiver toClientBuffer) { _toClientBuffer = toClientBuffer; return this; } public CopyBroadcastReceiver ToClientBuffer() { return _toClientBuffer; } public IRingBuffer ToDriverBuffer() { return _toDriverBuffer; } internal Context DriverProxy(DriverProxy driverProxy) { _driverProxy = driverProxy; return this; } public DriverProxy DriverProxy() { return _driverProxy; } internal Context LogBuffersFactory(ILogBuffersFactory logBuffersFactory) { _logBuffersFactory = logBuffersFactory; return this; } public ILogBuffersFactory LogBuffersFactory() { return _logBuffersFactory; } public Context ErrorHandler(ErrorHandler errorHandler) { _errorHandler = errorHandler; return this; } public ErrorHandler ErrorHandler() { return _errorHandler; } public Context SubscriberErrorHandler(ErrorHandler errorHandler) { _subscriberErrorHandler = errorHandler; return this; } public ErrorHandler SubscriberErrorHandler() { return _subscriberErrorHandler; } public Context AvailableImageHandler(AvailableImageHandler handler) { _availableImageHandler = handler; return this; } public AvailableImageHandler AvailableImageHandler() { return _availableImageHandler; } public Context UnavailableImageHandler(UnavailableImageHandler handler) { _unavailableImageHandler = handler; return this; } public UnavailableImageHandler UnavailableImageHandler() { return _unavailableImageHandler; } public Context AvailableCounterHandler(AvailableCounterHandler handler) { _availableCounterHandler = handler; return this; } public AvailableCounterHandler AvailableCounterHandler() { return _availableCounterHandler; } public Context UnavailableCounterHandler(UnavailableCounterHandler handler) { _unavailableCounterHandler = handler; return this; } public UnavailableCounterHandler UnavailableCounterHandler() { return _unavailableCounterHandler; } public Context CloseHandler(Action handler) { _closeHandler = handler; return this; } public Action CloseHandler() { return _closeHandler; } public UnsafeBuffer CountersMetaDataBuffer() { return _countersMetaDataBuffer; } public Context CountersMetaDataBuffer(UnsafeBuffer countersMetaDataBuffer) { _countersMetaDataBuffer = countersMetaDataBuffer; return this; } public UnsafeBuffer CountersValuesBuffer() { return _countersValuesBuffer; } public Context CountersValuesBuffer(UnsafeBuffer countersValuesBuffer) { _countersValuesBuffer = countersValuesBuffer; return this; } public Context KeepAliveIntervalNs(long value) { _keepAliveIntervalNs = value; return this; } public long KeepAliveIntervalNs() { return _keepAliveIntervalNs; } public Context DriverTimeoutMs(long driverTimeoutMs) { _driverTimeoutMs = driverTimeoutMs; return this; } public long DriverTimeoutMs() { return CheckDebugTimeout(_driverTimeoutMs, TimeUnit.MILLIS, "DriverTimeoutMs"); } public static long CheckDebugTimeout(long timeout, TimeUnit timeUnit, string debugFieldName) { string property = Config.GetProperty("aeron.debug.timeout"); if (property == null || !Debugger.IsAttached) { return timeout; } try { long num = SystemUtil.ParseDuration("aeron.debug.timeout", property); long result = timeUnit.Convert(num, TimeUnit.NANOSECONDS); if (DebugFieldsSeen.TryAdd(debugFieldName, value: true)) { Console.WriteLine("Using debug timeout [" + result + "] for " + debugFieldName + " replacing [" + timeout + "]"); } return result; } catch (FormatException) { return timeout; } } internal Context InterServiceTimeoutNs(long interServiceTimeout) { _interServiceTimeoutNs = interServiceTimeout; return this; } public long InterServiceTimeoutNs() { return CheckDebugTimeout(_interServiceTimeoutNs, TimeUnit.NANOSECONDS, "InterServiceTimeoutNs"); } public Context ResourceLingerDurationNs(long resourceLingerDurationNs) { _resourceLingerDurationNs = resourceLingerDurationNs; return this; } public long ResourceLingerDurationNs() { return _resourceLingerDurationNs; } public Context CloseLingerDurationNs(long closeLingerDurationNs) { _closeLingerDurationNs = closeLingerDurationNs; return this; } public long CloseLingerDurationNs() { return _closeLingerDurationNs; } public string AeronDirectoryName() { return _aeronDirectoryName; } public DirectoryInfo AeronDirectory() { return _aeronDirectory; } public Context AeronDirectoryName(string dirName) { _aeronDirectoryName = dirName; return this; } public Context ThreadFactory(IThreadFactory threadFactory) { _threadFactory = threadFactory; return this; } public IThreadFactory ThreadFactory() { return _threadFactory; } public void Dispose() { MappedByteBuffer cncByteBuffer = _cncByteBuffer; if (cncByteBuffer != null) { cncByteBuffer.Dispose(); } _cncByteBuffer = null; UnsafeBuffer cncMetaDataBuffer = _cncMetaDataBuffer; if (cncMetaDataBuffer != null) { cncMetaDataBuffer.Dispose(); } UnsafeBuffer countersMetaDataBuffer = _countersMetaDataBuffer; if (countersMetaDataBuffer != null) { countersMetaDataBuffer.Dispose(); } UnsafeBuffer countersValuesBuffer = _countersValuesBuffer; if (countersValuesBuffer != null) { countersValuesBuffer.Dispose(); } MappedByteBuffer cncByteBuffer2 = _cncByteBuffer; if (cncByteBuffer2 != null) { cncByteBuffer2.Dispose(); } } public override string ToString() { return "Aeron.Context\n{\n isConcluded=" + _isConcluded + "\n aeronDirectory=" + AeronDirectory()?.ToString() + "\n aeronDirectoryName='" + AeronDirectoryName() + "'\n cncFile=" + CncFile()?.ToString() + "\n countersMetaDataBuffer=" + ((object)CountersMetaDataBuffer())?.ToString() + "\n countersValuesBuffer=" + ((object)CountersValuesBuffer())?.ToString() + "\n driverTimeoutMs=" + DriverTimeoutMs() + "\n clientId=" + _clientId + "\n useConductorAgentInvoker=" + _useConductorAgentInvoker + "\n preTouchMappedMemory=" + _preTouchMappedMemory + "\n driverAgentInvoker=" + ((object)_driverAgentInvoker)?.ToString() + "\n clientLock=" + ((object)_clientLock)?.ToString() + "\n epochClock=" + ((object)_epochClock)?.ToString() + "\n nanoClock=" + ((object)_nanoClock)?.ToString() + "\n idleStrategy=" + ((object)_idleStrategy)?.ToString() + "\n awaitingIdleStrategy=" + ((object)_awaitingIdleStrategy)?.ToString() + "\n toClientBuffer=" + ((object)_toClientBuffer)?.ToString() + "\n toDriverBuffer=" + ((object)_toDriverBuffer)?.ToString() + "\n driverProxy=" + _driverProxy?.ToString() + "\n cncByteBuffer=" + ((object)_cncByteBuffer)?.ToString() + "\n cncMetaDataBuffer=" + ((object)_cncMetaDataBuffer)?.ToString() + "\n logBuffersFactory=" + _logBuffersFactory?.ToString() + "\n errorHandler=" + ((object)_errorHandler)?.ToString() + "\n subscriberErrorHandler=" + ((object)_subscriberErrorHandler)?.ToString() + "\n availableImageHandler=" + _availableImageHandler?.ToString() + "\n unavailableImageHandler=" + _unavailableImageHandler?.ToString() + "\n availableCounterHandler=" + _availableCounterHandler?.ToString() + "\n unavailableCounterHandler=" + _unavailableCounterHandler?.ToString() + "\n closeHandler=" + _closeHandler?.ToString() + "\n keepAliveIntervalNs=" + _keepAliveIntervalNs + "\n interServiceTimeoutNs=" + _interServiceTimeoutNs + "\n resourceLingerDurationNs=" + _resourceLingerDurationNs + "\n closeLingerDurationNs=" + _closeLingerDurationNs + "\n threadFactory=" + ((object)_threadFactory)?.ToString() + "\n}"; } private void ConnectToDriver() { //IL_0117: Unknown result type (might be due to invalid IL or missing references) //IL_011d: Expected O, but got Unknown long num = _epochClock.Time() + DriverTimeoutMs(); FileInfo fileInfo = CncFile(); while (_toDriverBuffer == null) { fileInfo.Refresh(); _cncByteBuffer = WaitForFileMapping(fileInfo, _epochClock, num); _cncMetaDataBuffer = CncFileDescriptor.CreateMetaDataBuffer(_cncByteBuffer); int intVolatile; while ((intVolatile = _cncMetaDataBuffer.GetIntVolatile(CncFileDescriptor.CncVersionOffset(0))) == 0) { if (_epochClock.Time() > num) { throw new DriverTimeoutException("CnC file is created but not initialised"); } Sleep(Configuration.AWAITING_IDLE_SLEEP_MS); } CncFileDescriptor.CheckVersion(intVolatile); if (SemanticVersion.Minor(intVolatile) < SemanticVersion.Minor(CncFileDescriptor.CNC_VERSION)) { throw new AeronException("driverVersion=" + SemanticVersion.ToString(intVolatile) + " insufficient for clientVersion=" + SemanticVersion.ToString(CncFileDescriptor.CNC_VERSION)); } if (!CncFileDescriptor.IsCncFileLengthSufficient((IDirectBuffer)(object)_cncMetaDataBuffer, _cncByteBuffer.Capacity)) { MappedByteBuffer cncByteBuffer = _cncByteBuffer; if (cncByteBuffer != null) { cncByteBuffer.Dispose(); } _cncByteBuffer = null; _cncMetaDataBuffer = null; Sleep(Configuration.AWAITING_IDLE_SLEEP_MS); continue; } ManyToOneRingBuffer val = new ManyToOneRingBuffer((IAtomicBuffer)(object)CncFileDescriptor.CreateToDriverBuffer(_cncByteBuffer, (IDirectBuffer)(object)_cncMetaDataBuffer)); while (val.ConsumerHeartbeatTime() == 0L) { if (_epochClock.Time() > num) { throw new DriverTimeoutException("no driver heartbeat detected."); } Sleep(Configuration.AWAITING_IDLE_SLEEP_MS); } long num2 = _epochClock.Time(); if (val.ConsumerHeartbeatTime() < num2 - DriverTimeoutMs()) { if (num2 > num) { throw new DriverTimeoutException("no driver heartbeat detected."); } IoUtil.Unmap(_cncByteBuffer); _cncByteBuffer = null; _cncMetaDataBuffer = null; Sleep(100); } else { _toDriverBuffer = (IRingBuffer)(object)val; } } } private static MappedByteBuffer WaitForFileMapping(FileInfo cncFile, IEpochClock clock, long deadLineMs) { while (true) { if (!cncFile.Exists || cncFile.Length <= 0) { if (clock.Time() > deadLineMs) { break; } Sleep(Configuration.IdleSleepMs); cncFile.Refresh(); continue; } try { FileAccess access = FileAccess.ReadWrite; FileShare share = FileShare.ReadWrite | FileShare.Delete; FileStream fileStream = cncFile.Open(FileMode.Open, access, share); if (fileStream.Length < CncFileDescriptor.META_DATA_LENGTH) { if (clock.Time() > deadLineMs) { throw new DriverTimeoutException("CnC file is created but not populated."); } fileStream.Dispose(); Sleep(Configuration.IdleSleepMs); continue; } return IoUtil.MapExistingFile(fileStream); } catch (FileNotFoundException) { } catch (IOException) { } catch (Exception innerException) { throw new AeronException("cannot open CnC file", innerException); } } throw new DriverTimeoutException("CnC file not created: " + cncFile.FullName); } public void DeleteAeronDirectory() { IoUtil.Delete(_aeronDirectory, false); } public MappedByteBuffer MapExistingCncFile(Action<string> logProgress) { FileInfo fileInfo = new FileInfo(Path.Combine(_aeronDirectory.FullName, "cnc.dat")); if (fileInfo.Exists && fileInfo.Length > CncFileDescriptor.END_OF_METADATA_OFFSET) { logProgress?.Invoke("INFO: Aeron CnC file " + fileInfo?.ToString() + "exists"); return IoUtil.MapExistingFile(fileInfo, "cnc.dat"); } return null; } public static bool IsDriverActive(DirectoryInfo directory, long driverTimeoutMs, Action<string> logger) { FileInfo fileInfo = new FileInfo(Path.Combine(directory.FullName, "cnc.dat")); if (fileInfo.Exists && fileInfo.Length > CncFileDescriptor.END_OF_METADATA_OFFSET) { logger("INFO: Aeron CnC file " + fileInfo?.ToString() + " exists"); MappedByteBuffer val = IoUtil.MapExistingFile(fileInfo, "CnC file"); try { return IsDriverActive(driverTimeoutMs, logger, val); } finally { if (val != null) { val.Dispose(); } } } return false; } public bool IsDriverActive(long driverTimeoutMs, Action<string> logger) { MappedByteBuffer val = MapExistingCncFile(logger); try { return IsDriverActive(driverTimeoutMs, logger, val); } finally { if (val != null) { val.Dispose(); } } } public static bool IsDriverActive(long driverTimeoutMs, Action<string> logger, MappedByteBuffer cncByteBuffer) { //IL_004c: Unknown result type (might be due to invalid IL or missing references) if (cncByteBuffer == null) { return false; } UnsafeBuffer val = CncFileDescriptor.CreateMetaDataBuffer(cncByteBuffer); long num = UnixTimeConverter.CurrentUnixTimeMillis(); int intVolatile; while ((intVolatile = val.GetIntVolatile(CncFileDescriptor.CncVersionOffset(0))) == 0) { if (UnixTimeConverter.CurrentUnixTimeMillis() > num + driverTimeoutMs) { throw new DriverTimeoutException("CnC file is created but not initialised."); } Sleep(1); } CncFileDescriptor.CheckVersion(intVolatile); long num2 = new ManyToOneRingBuffer((IAtomicBuffer)(object)CncFileDescriptor.CreateToDriverBuffer(cncByteBuffer, (IDirectBuffer)(object)val)).ConsumerHeartbeatTime(); long num3 = DateTime.Now.ToFileTimeUtc() - num2; logger("INFO: Aeron toDriver consumer heartbeat is (ms):" + num3); return num3 <= driverTimeoutMs; } public static bool RequestDriverTermination(DirectoryInfo directory, IDirectBuffer tokenBuffer, int tokenOffset, int tokenLength) { //IL_005d: Unknown result type (might be due to invalid IL or missing references) //IL_0062: Unknown result type (might be due to invalid IL or missing references) //IL_0071: Expected O, but got Unknown FileInfo fileInfo = new FileInfo(Path.Combine(directory.FullName, "cnc.dat")); if (fileInfo.Exists && fileInfo.Length > CncFileDescriptor.END_OF_METADATA_OFFSET) { MappedByteBuffer val = IoUtil.MapExistingFile(fileInfo, "CnC file"); try { UnsafeBuffer val2 = CncFileDescriptor.CreateMetaDataBuffer(val); int intVolatile = val2.GetIntVolatile(CncFileDescriptor.CncVersionOffset(0)); if (intVolatile > 0) { CncFileDescriptor.CheckVersion(intVolatile); ManyToOneRingBuffer val3 = new ManyToOneRingBuffer((IAtomicBuffer)(object)CncFileDescriptor.CreateToDriverBuffer(val, (IDirectBuffer)(object)val2)); long clientId = val3.NextCorrelationId(); return new DriverProxy((IRingBuffer)val3, clientId).TerminateDriver(tokenBuffer, tokenOffset, tokenLength); } } finally { if (val != null) { val.Dispose(); } } } return false; } public int SaveErrorLog(StreamWriter writer) { MappedByteBuffer val = MapExistingCncFile(null); try { return SaveErrorLog(writer, val); } finally { if (val != null) { val.Dispose(); } } } public int SaveErrorLog(StreamWriter writer, MappedByteBuffer cncByteBuffer) { if (cncByteBuffer == null) { return 0; } return PrintErrorLog(ErrorLogBuffer(cncByteBuffer), writer); } public static int PrintErrorLog(IAtomicBuffer errorBuffer, TextWriter @out) { //IL_001f: Unknown result type (might be due to invalid IL or missing references) //IL_0029: Expected O, but got Unknown int num = 0; if (ErrorLogReader.HasErrors(errorBuffer)) { num = ErrorLogReader.Read(errorBuffer, new ErrorConsumer(ErrorConsumer)); @out.WriteLine(); @out.WriteLine("{0} distinct errors observed.", num); } return num; void ErrorConsumer(int count, long firstTimestamp, long lastTimestamp, string encodedException) { DateTime dateTime = new DateTime(firstTimestamp); DateTime dateTime2 = new DateTime(lastTimestamp); @out.WriteLine(); @out.WriteLine($"{count} observations from {dateTime} to {dateTime2} for:"); @out.WriteLine(encodedException); } } private static IAtomicBuffer ErrorLogBuffer(MappedByteBuffer cncByteBuffer) { UnsafeBuffer val = CncFileDescriptor.CreateMetaDataBuffer(cncByteBuffer); CncFileDescriptor.CheckVersion(val.GetInt(CncFileDescriptor.CncVersionOffset(0))); return (IAtomicBuffer)(object)CncFileDescriptor.CreateErrorLogBuffer(cncByteBuffer, (IDirectBuffer)(object)val); } public static ErrorHandler SetupErrorHandler(ErrorHandler userErrorHandler, DistinctErrorLog errorLog) { //IL_000f: Unknown result type (might be due to invalid IL or missing references) //IL_0019: Expected O, but got Unknown //IL_003b: Unknown result type (might be due to invalid IL or missing references) //IL_0041: Expected O, but got Unknown //IL_002e: Unknown result type (might be due to invalid IL or missing references) //IL_0034: Expected O, but got Unknown LoggingErrorHandler loggingErrorHandler = new LoggingErrorHandler(errorLog); if (userErrorHandler == null) { LoggingErrorHandler obj = loggingErrorHandler; return new ErrorHandler(obj.OnError); } return (ErrorHandler)delegate(Exception throwable) { loggingErrorHandler.OnError(throwable); userErrorHandler.Invoke(throwable); }; } } public const int NULL_VALUE = -1; private readonly AtomicBoolean _isClosed = new AtomicBoolean(false); private readonly long _clientId; private readonly ClientConductor _conductor; private readonly IRingBuffer _commandBuffer; private readonly AgentInvoker _conductorInvoker; private readonly AgentRunner _conductorRunner; private readonly Context _ctx; public bool IsClosed => _isClosed.Get(); public Context Ctx => _ctx; public long ClientId => _clientId; public AgentInvoker ConductorAgentInvoker => _conductorInvoker; public CountersReader CountersReader { get { if (_isClosed.Get()) { throw new AeronException("client is closed"); } return _conductor.CountersReader(); } } internal Aeron(Context ctx) { //IL_0002: Unknown result type (might be due to invalid IL or missing references) //IL_000c: Expected O, but got Unknown //IL_0089: Unknown result type (might be due to invalid IL or missing references) //IL_0093: Expected O, but got Unknown //IL_005b: Unknown result type (might be due to invalid IL or missing references) //IL_0065: Expected O, but got Unknown try { ctx.Conclude(); _ctx = ctx; _clientId = ctx.ClientId(); _commandBuffer = ctx.ToDriverBuffer(); _conductor = new ClientConductor(ctx, this); if (ctx.UseConductorAgentInvoker()) { _conductorInvoker = new AgentInvoker(ctx.ErrorHandler(), (AtomicCounter)null, (IAgent)(object)_conductor); _conductorRunner = null; } else { _conductorInvoker = null; _conductorRunner = new AgentRunner(ctx.IdleStrategy(), ctx.ErrorHandler(), (AtomicCounter)null, (IAgent)(object)_conductor); } } catch (ConcurrentConcludeException) { throw; } catch (Exception) { CloseHelper.QuietDispose((IDisposable)ctx); throw; } } public static Aeron Connect() { return Connect(new Context()); } public static Aeron Connect(Context ctx) { try { Aeron aeron = new Aeron(ctx); if (ctx.UseConductorAgentInvoker()) { aeron.ConductorAgentInvoker.Start(); } else { AgentRunner.StartOnThread(aeron._conductorRunner, ctx.ThreadFactory()); } return aeron; } catch (ConcurrentConcludeException) { throw; } catch (Exception) { ctx.Dispose(); throw; } } public void PrintCounters(StreamWriter @out) { //IL_001a: Unknown result type (might be due to invalid IL or missing references) //IL_0024: Expected O, but got Unknown CountersReader.ForEach((CounterConsumer)delegate(long value, int id, string label) { @out.WriteLine("{0,3}: {1:} - {2}", id, value, label); }); } public bool IsCommandActive(long correlationId) { return _conductor.IsCommandActive(correlationId); } public bool HasActiveCommands() { return _conductor.HasActiveCommands(); } public void Dispose() { if (_isClosed.CompareAndSet(false, true)) { ErrorHandler val = _ctx.ErrorHandler(); if (_conductorRunner != null) { CloseHelper.Dispose(val, (IDisposable)_conductorRunner); } else { CloseHelper.Dispose(val, (IDisposable)_conductorInvoker); } } } public Publication AddPublication(string channel, int streamId) { return _conductor.AddPublication(channel, streamId); } public ExclusivePublication AddExclusivePublication(string channel, int streamId) { return _conductor.AddExclusivePublication(channel, streamId); } public long AsyncAddPublication(string channel, int streamId) { return _conductor.AsyncAddPublication(channel, streamId); } public void AsyncRemovePublication(long registrationId) { _conductor.RemovePublication(registrationId); } public long AsyncAddExclusivePublication(string channel, int streamId) { return _conductor.AsyncAddExclusivePublication(channel, streamId); } public ConcurrentPublication GetPublication(long registrationId) { return _conductor.GetPublication(registrationId); } public ExclusivePublication GetExclusivePublication(long registrationId) { return _conductor.GetExclusivePublication(registrationId); } public Subscription AddSubscription(string channel, int streamId) { return _conductor.AddSubscription(channel, streamId); } public Subscription AddSubscription(string channel, int streamId, AvailableImageHandler availableImageHandler, UnavailableImageHandler unavailableImageHandler) { return _conductor.AddSubscription(channel, streamId, availableImageHandler, unavailableImageHandler); } public long NextCorrelationId() { if (_isClosed.Get()) { throw new AeronException("client is closed"); } return _commandBuffer.NextCorrelationId(); } public Counter AddCounter(int typeId, IDirectBuffer keyBuffer, int keyOffset, int keyLength, IDirectBuffer labelBuffer, int labelOffset, int labelLength) { return _conductor.AddCounter(typeId, keyBuffer, keyOffset, keyLength, labelBuffer, labelOffset, labelLength); } public Counter AddCounter(int typeId, string label) { return _conductor.AddCounter(typeId, label); } public long AddAvailableCounterHandler(AvailableCounterHandler handler) { return _conductor.AddAvailableCounterHandler(handler); } public bool RemoveAvailableCounterHandler(long registrationId) { return _conductor.RemoveAvailableCounterHandler(registrationId); } [Obsolete] public bool RemoveAvailableCounterHandler(AvailableCounterHandler handler) { return _conductor.RemoveAvailableCounterHandler(handler); } public long AddUnavailableCounterHandler(UnavailableCounterHandler handler) { return _conductor.AddUnavailableCounterHandler(handler); } public bool RemoveUnavailableCounterHandler(long registrationId) { return _conductor.RemoveUnavailableCounterHandler(registrationId); } [Obsolete] public bool RemoveUnavailableCounterHandler(UnavailableCounterHandler handler) { return _conductor.RemoveUnavailableCounterHandler(handler); } public long AddCloseHandler(Action handler) { return _conductor.AddCloseHandler(handler); } [Obsolete] public bool RemoveCloseHandler(Action handler) { return _conductor.RemoveCloseHandler(handler); } public bool RemoveCloseHandler(long registrationId) { return _conductor.RemoveCloseHandler(registrationId); } internal void InternalClose() { _isClosed.Set(true); } private static void Sleep(int durationMs) { try { Thread.Sleep(durationMs); } catch (ThreadInterruptedException innerException) { Thread.CurrentThread.Interrupt(); throw new AeronException("unexpected interrupt", innerException); } } } public static class AeronCounters { public const int DRIVER_SYSTEM_COUNTER_TYPE_ID = 0; public const int DRIVER_PUBLISHER_LIMIT_TYPE_ID = 1; public const int DRIVER_SENDER_POSITION_TYPE_ID = 2; public const int DRIVER_RECEIVER_HWM_TYPE_ID = 3; public const int DRIVER_SUBSCRIBER_POSITION_TYPE_ID = 4; public const int DRIVER_RECEIVER_POS_TYPE_ID = 5; public const int DRIVER_SEND_CHANNEL_STATUS_TYPE_ID = 6; public const int DRIVER_RECEIVE_CHANNEL_STATUS_TYPE_ID = 7; public const int DRIVER_SENDER_LIMIT_TYPE_ID = 9; public const int DRIVER_PER_IMAGE_TYPE_ID = 10; public const int DRIVER_HEARTBEAT_TYPE_ID = 11; public const int DRIVER_PUBLISHER_POS_TYPE_ID = 12; public const int DRIVER_SENDER_BPE_TYPE_ID = 13; public const int NAME_RESOLVER_NEIGHBORS_COUNTER_TYPE_ID = 15; public const int NAME_RESOLVER_CACHE_ENTRIES_COUNTER_TYPE_ID = 16; public const int DRIVER_LOCAL_SOCKET_ADDRESS_STATUS_TYPE_ID = 14; public const int FLOW_CONTROL_RECEIVERS_COUNTER_TYPE_ID = 17; public const int MDC_DESTINATIONS_COUNTER_TYPE_ID = 18; public const int ARCHIVE_RECORDING_POSITION_TYPE_ID = 100; public const int ARCHIVE_ERROR_COUNT_TYPE_ID = 101; public const int ARCHIVE_CONTROL_SESSIONS_TYPE_ID = 102; public const int ARCHIVE_MAX_CYCLE_TIME_TYPE_ID = 103; public const int ARCHIVE_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID = 104; public const int CLUSTER_CONSENSUS_MODULE_STATE_TYPE_ID = 200; public const int CLUSTER_NODE_ROLE_TYPE_ID = 201; public const int CLUSTER_CONTROL_TOGGLE_TYPE_ID = 202; public const int CLUSTER_COMMIT_POSITION_TYPE_ID = 203; public const int CLUSTER_RECOVERY_STATE_TYPE_ID = 204; public const int CLUSTER_SNAPSHOT_COUNTER_TYPE_ID = 205; public const int CLUSTER_ELECTION_STATE_TYPE_ID = 207; public const int CLUSTER_BACKUP_STATE_TYPE_ID = 208; public const int CLUSTER_BACKUP_LIVE_LOG_POSITION_TYPE_ID = 209; public const int CLUSTER_BACKUP_QUERY_DEADLINE_TYPE_ID = 210; public const int CLUSTER_BACKUP_ERROR_COUNT_TYPE_ID = 211; public const int CLUSTER_CONSENSUS_MODULE_ERROR_COUNT_TYPE_ID = 212; public const int CLUSTER_CLIENT_TIMEOUT_COUNT_TYPE_ID = 213; public const int CLUSTER_INVALID_REQUEST_COUNT_TYPE_ID = 214; public const int CLUSTER_CLUSTERED_SERVICE_ERROR_COUNT_TYPE_ID = 215; public const int CLUSTER_MAX_CYCLE_TIME_TYPE_ID = 216; public const int CLUSTER_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID = 217; public const int CLUSTER_CLUSTERED_SERVICE_MAX_CYCLE_TIME_TYPE_ID = 218; public const int CLUSTER_CLUSTERED_SERVICE_CYCLE_TIME_THRESHOLD_EXCEEDED_TYPE_ID = 219; public const int CLUSTER_STANDBY_STATE_TYPE_ID = 220; public const int CLUSTER_STANDBY_ERROR_COUNT_TYPE_ID = 221; public const int CLUSTER_STANDBY_HEARTBEAT_RESPONSE_COUNT_TYPE_ID = 222; public static void ValidateCounterTypeId(CountersReader countersReader, int counterId, int expectedCounterTypeId) { int counterTypeId = countersReader.GetCounterTypeId(counterId); if (expectedCounterTypeId != counterTypeId) { throw new ConfigurationException("The type for counterId=" + counterId + ", typeId=" + counterTypeId + " does not match the expected=" + expectedCounterTypeId); } } public static void ValidateCounterTypeId(Aeron aeron, Counter counter, int expectedCounterTypeId) { ValidateCounterTypeId(aeron.CountersReader, ((AtomicCounter)counter).Id, expectedCounterTypeId); } } public class AeronThrowHelper { [MethodImpl(MethodImplOptions.NoInlining)] public static void ThrowAeronException(string message) { throw GetAeronException(message); } [MethodImpl(MethodImplOptions.NoInlining)] private static AeronException GetAeronException(string message) { return new AeronException(message); } } public delegate void AvailableCounterHandler(CountersReader countersReader, long registrationId, int counterId); public sealed class BufferBuilder { internal const int MAX_CAPACITY = 2147483639; internal const int INIT_MIN_CAPACITY = 4096; private readonly UnsafeBuffer _buffer; private int _limit; private int _nextTermOffset; public BufferBuilder() : this(0) { } public BufferBuilder(int initialCapacity) { //IL_0042: Unknown result type (might be due to invalid IL or missing references) //IL_004c: Expected O, but got Unknown if (initialCapacity < 0 || initialCapacity > 2147483639) { throw new ArgumentException("initialCapacity outside range 0 - " + 2147483639 + ": initialCapacity=" + initialCapacity); } _buffer = new UnsafeBuffer(new byte[initialCapacity]); } public int Capacity() { return _buffer.Capacity; } public int Limit() { return _limit; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Limit(int limit) { if (limit < 0 || limit >= _buffer.Capacity) { ThrowHelper.ThrowArgumentException($"limit outside range: capacity={_buffer.Capacity:D} limit={limit:D}"); } _limit = limit; } public int NextTermOffset() { return _nextTermOffset; } public void NextTermOffset(int offset) { _nextTermOffset = offset; } public IMutableDirectBuffer Buffer() { return (IMutableDirectBuffer)(object)_buffer; } public BufferBuilder Reset() { _limit = 0; _nextTermOffset = 0; return this; } public BufferBuilder Compact() { Resize(Math.Max(4096, _limit)); return this; } public BufferBuilder Append(IDirectBuffer srcBuffer, int srcOffset, int length) { EnsureCapacity(length); srcBuffer.GetBytes(srcOffset, (IMutableDirectBuffer)(object)_buffer, _limit, length); _limit += length; return this; } private void EnsureCapacity(int additionalLength) { long num = (long)_limit + (long)additionalLength; int capacity = _buffer.Capacity; if (num > capacity) { if (num > 2147483639) { throw new InvalidOperationException("insufficient capacity: maxCapacity=" + 2147483639 + " limit=" + _limit + " additionalLength=" + additionalLength); } Resize(FindSuitableCapacity(capacity, num)); } } private void Resize(int newCapacity) { _buffer.Wrap(CopyOf(_buffer.ByteArray, newCapacity)); } private static T[] CopyOf<T>(T[] original, int newLength) { T[] array = new T[newLength]; Array.Copy(original, 0, array, 0, Math.Min(original.Length, newLength)); return array; } internal static int FindSuitableCapacity(int capacity, long requiredCapacity) { long num = Math.Max(capacity, 4096); while (num < requiredCapacity) { num += num >> 1; if (num > 2147483639) { num = 2147483639L; } } return (int)num; } } public sealed class ChannelUri { private enum State { MEDIA, PARAMS_KEY, PARAMS_VALUE } public const string AERON_SCHEME = "aeron"; public const string SPY_QUALIFIER = "aeron-spy"; public const long INVALID_TAG = -1L; private const int CHANNEL_TAG_INDEX = 0; private const int ENTITY_TAG_INDEX = 1; private static readonly string AERON_PREFIX = "aeron:"; private string _prefix; private string _media; private readonly Map<string, string> _params; private readonly string[] _tags; public bool IsUdp => "udp".Equals(_media); public bool IsIpc => "ipc".Equals(_media); public ChannelUri(string prefix, string media, Map<string, string> @params) { _prefix = prefix; _media = media; _params = @params; _tags = SplitTags(_params.Get("tags")); } public string Prefix() { return _prefix; } public ChannelUri Prefix(string prefix) { _prefix = prefix; return this; } public string Media() { return _media; } public ChannelUri Media(string media) { ValidateMedia(media); _media = media; return this; } public string Scheme() { return "aeron"; } public string Get(string key) { return _params.Get(key); } public string Get(string key, string defaultValue) { string text = _params.Get(key); if (text != null) { return text; } return defaultValue; } public string Put(string key, string value) { return _params.Put(key, value); } public string Remove(string key) { return _params.Remove(key); } public bool ContainsKey(string key) { return _params.ContainsKey(key); } public string ChannelTag() { if (_tags == null || _tags.Length == 0) { return null; } return _tags[0]; } public string EntityTag() { if (_tags.Length <= 1) { return null; } return _tags[1]; } private bool Equals(ChannelUri other) { if (_prefix == other._prefix && _media == other._media && object.Equals(_params, other._params)) { return object.Equals(_tags, other._tags); } return false; } public override bool Equals(object obj) { if (this == obj) { return true; } if (!(obj is ChannelUri channelUri)) { return false; } if (object.Equals(_prefix, channelUri._prefix) && object.Equals(_media, channelUri._media) && object.Equals(_params, channelUri._params)) { return object.Equals(_tags, channelUri._tags); } return false; } public override int GetHashCode() { return (((((((_prefix != null) ? _prefix.GetHashCode() : 0) * 397) ^ ((_media != null) ? _media.GetHashCode() : 0)) * 397) ^ ((_params != null) ? ((object)_params).GetHashCode() : 0)) * 397) ^ ((_tags != null) ? _tags.GetHashCode() : 0); } public override string ToString() { StringBuilder stringBuilder; if (_prefix == null || "".Equals(_prefix)) { stringBuilder = new StringBuilder(_params.Count * 20 + 10); } else { stringBuilder = new StringBuilder(_params.Count * 20 + 20); stringBuilder.Append(_prefix); if (!_prefix.EndsWith(":")) { stringBuilder.Append(":"); } } stringBuilder.Append(AERON_PREFIX).Append(_media); if (_params.Count > 0) { stringBuilder.Append('?'); foreach (KeyValuePair<string, string> param in _params) { stringBuilder.Append(param.Key).Append('=').Append(param.Value) .Append('|'); } stringBuilder.Length--; } return stringBuilder.ToString(); } public void InitialPosition(long position, int initialTermId, int termLength) { if (position < 0 || (position & 0x1F) != 0L) { throw new ArgumentException("invalid position: " + position); } int positionBitsToShift = LogBufferDescriptor.PositionBitsToShift(termLength); int num = LogBufferDescriptor.ComputeTermIdFromPosition(position, positionBitsToShift, initialTermId); int num2 = (int)(position & (termLength - 1)); Put("init-term-id", Convert.ToString(initialTermId)); Put("term-id", Convert.ToString(num)); Put("term-offset", Convert.ToString(num2)); Put("term-length", Convert.ToString(termLength)); } public static ChannelUri Parse(string cs) { int num = 0; string prefix; if (StartsWith(cs, 0, "aeron-spy:")) { prefix = "aeron-spy"; num = "aeron-spy:".Length; } else { prefix = ""; } if (!StartsWith(cs, num, AERON_PREFIX)) { throw new ArgumentException("Aeron URIs must start with 'aeron:', found: " + cs); } num += AERON_PREFIX.Length; StringBuilder stringBuilder = new StringBuilder(); Map<string, string> val = new Map<string, string>((string)null); string media = null; string text = null; State state = State.MEDIA; int i = num; for (int length = cs.Length; i < length; i++) { char c = cs[i]; switch (state) { case State.MEDIA: switch (c) { case '?': media = stringBuilder.ToString(); stringBuilder.Length = 0; state = State.PARAMS_KEY; break; case ':': case '=': case '|': throw new ArgumentException("encountered '" + c + "' within media definition at index " + i + " in " + cs); default: stringBuilder.Append(c); break; } break; case State.PARAMS_KEY: switch (c) { case '=': if (stringBuilder.Length == 0) { throw new ArgumentException("empty key not allowed at index " + i + " in " + cs); } text = stringBuilder.ToString(); stringBuilder.Length = 0; state = State.PARAMS_VALUE; break; case '|': throw new ArgumentException("invalid end of key at index " + i + " in " + cs); default: stringBuilder.Append(c); break; } break; case State.PARAMS_VALUE: if (c == '|') { val.Put(text, stringBuilder.ToString()); stringBuilder.Length = 0; state = State.PARAMS_KEY; } else { stringBuilder.Append(c); } break; default: throw new ArgumentException("unexpected state=" + state.ToString() + " in " + cs); } } switch (state) { case State.MEDIA: media = stringBuilder.ToString(); ValidateMedia(media); break; case State.PARAMS_VALUE: val.Put(text, stringBuilder.ToString()); break; default: throw new ArgumentException("no more input found, state=" + state.ToString() + " in " + cs); } return new ChannelUri(prefix, media, val); } public static string AddSessionId(string channel, int sessionId) { ChannelUri channelUri = Parse(channel); channelUri.Put("session-id", Convert.ToString(sessionId)); return channelUri.ToString(); } public static bool IsTagged(string paramValue) { return StartsWith(paramValue, 0, "tag:"); } public static long GetTag(string paramValue) { if (!IsTagged(paramValue)) { return -1L; } return long.Parse(paramValue.Substring(4, paramValue.Length - 4)); } public static string CreateDestinationUri(string channel, string endpoint) { ChannelUri channelUri = Parse(channel); string text = AERON_PREFIX + channelUri.Media() + "?endpoint=" + endpoint; string text2 = channelUri.Get("interface"); if (text2 != null) { return text + "|interface=" + text2; } return text; } public void ReplaceEndpointWildcardPort(string resolvedEndpoint) { if (resolvedEndpoint == null) { throw new ArgumentNullException("resolvedEndpoint", "resolvedEndpoint is null"); } int num = resolvedEndpoint.LastIndexOf(':'); if (-1 == num) { throw new ArgumentException("No port specified on resolvedEndpoint=" + resolvedEndpoint); } if (resolvedEndpoint.EndsWith(":0", StringComparison.Ordinal)) { throw new ArgumentException("Wildcard port specified on resolvedEndpoint=" + resolvedEndpoint); } string text = Get("endpoint"); if (text == null) { Put("endpoint", resolvedEndpoint); } else if (text.EndsWith(":0", StringComparison.Ordinal)) { string value = text.Substring(0, text.Length - 2) + resolvedEndpoint.Substring(resolvedEndpoint.LastIndexOf(':')); Put("endpoint", value); } } private static void ValidateMedia(string media) { if ("ipc".Equals(media) || "udp".Equals(media)) { return; } throw new ArgumentException("unknown media: " + media); } private static bool StartsWith(string input, int position, string prefix) { if (input.Length - position < prefix.Length) { return false; } for (int i = 0; i < prefix.Length; i++) { if (input[position + i] != prefix[i]) { return false; } } return true; } private static string[] SplitTags(string tagsValue) { string[] array = ArrayUtil.EMPTY_STRING_ARRAY; if (tagsValue != null) { int num = CountTags(tagsValue); if (num == 1) { array = new string[1] { tagsValue }; } else { int num2 = 0; int num3 = 0; array = new string[num]; int i = 0; for (int length = tagsValue.Length; i < length; i++) { if (tagsValue[i] == ',') { array[num3++] = tagsValue.Substring(num2, i - num2); num2 = i + 1; if (num3 >= num - 1) { array[num3] = tagsValue.Substring(num2, length - num2); } } } } } return array; } private static int CountTags(string tags) { int num = 1; int i = 0; for (int length = tags.Length; i < length; i++) { if (tags[i] == ',') { num++; } } return num; } } public sealed class ChannelUriStringBuilder { public const string TAG_PREFIX = "tag:"; private readonly StringBuilder _sb = new StringBuilder(64); private string _prefix; private string _media; private string _endpoint; private string _networkInterface; private string _controlEndpoint; private string _controlMode; private string _tags; private string _alias; private string _cc; private string _fc; private bool? _reliable; private int? _ttl; private int? _mtu; private int? _termLength; private int? _initialTermId; private int? _termId; private int? _termOffset; private long? _sessionId; private long? _groupTag; private long? _linger; private bool? _sparse; private bool? _eos; private bool? _tether; private bool? _group; private bool? _rejoin; private bool? _ssc; private bool _isSessionIdTagged; private int? _socketSndbufLength; private int? _socketRcvbufLength; private int? _receiverWindowLength; private string _mediaReceiveTimestampOffset; private string _channelReceiveTimestampOffset; private string _channelSendTimestampOffset; public ChannelUriStringBuilder() { } public ChannelUriStringBuilder(string initialUri) : this(ChannelUri.Parse(initialUri)) { } public ChannelUriStringBuilder(ChannelUri channelUri) { _isSessionIdTagged = false; Prefix(channelUri); Media(channelUri); Endpoint(channelUri); NetworkInterface(channelUri); ControlEndpoint(channelUri); ControlMode(channelUri); Tags(channelUri); Alias(channelUri); CongestionControl(channelUri); FlowControl(channelUri); Reliable(channelUri); Ttl(channelUri); Mtu(channelUri); TermLength(channelUri); InitialTermId(channelUri); TermId(channelUri); TermOffset(channelUri); SessionId(channelUri); Group(channelUri); Linger(channelUri); Sparse(channelUri); Eos(channelUri); Tether(channelUri); GroupTag(channelUri); Rejoin(channelUri); SpiesSimulateConnection(channelUri); SocketRcvbufLength(channelUri); SocketSndbufLength(channelUri); ReceiverWindowLength(channelUri); MediaReceiveTimestampOffset(channelUri); ChannelReceiveTimestampOffset(channelUri); ChannelSendTimestampOffset(channelUri); } public ChannelUriStringBuilder Clear() { _prefix = null; _media = null; _endpoint = null; _networkInterface = null; _controlEndpoint = null; _controlMode = null; _tags = null; _alias = null; _cc = null; _fc = null; _reliable = null; _ttl = null; _mtu = null; _termLength = null; _initialTermId = null; _termId = null; _termOffset = null; _sessionId = null; _groupTag = null; _linger = null; _sparse = null; _eos = null; _tether = null; _group = null; _rejoin = null; _isSessionIdTagged = false; _socketRcvbufLength = null; _socketSndbufLength = null; _receiverWindowLength = null; _mediaReceiveTimestampOffset = null; _channelReceiveTimestampOffset = null; _channelSendTimestampOffset = null; return this; } public ChannelUriStringBuilder Validate() { if (_media == null) { throw new InvalidOperationException("media type is mandatory"); } if ("udp".Equals(_media) && _endpoint == null && _controlEndpoint == null) { throw new InvalidOperationException("either 'endpoint' or 'control' must be specified for UDP."); } int num = 0; num += (_initialTermId.HasValue ? 1 : 0); num += (_termId.HasValue ? 1 : 0); num += (_termOffset.HasValue ? 1 : 0); if (num > 0) { if (num < 3) { throw new ArgumentException("if any of then a complete set of 'initialTermId', 'termId', and 'termOffset' must be provided"); } if (_termId - _initialTermId < 0) { int? termId = _termId; string? text = termId.ToString(); termId = _initialTermId; throw new ArgumentException("difference greater than 2^31 - 1: termId=" + text + " - initialTermId=" + termId); } if (_termLength.HasValue && _termOffset > _termLength) { int? termOffset = _termOffset; string? text2 = termOffset.ToString(); termOffset = _termLength; throw new ArgumentException("termOffset=" + text2 + " > termLength=" + termOffset); } } return this; } public ChannelUriStringBuilder Prefix(string prefix) { if (prefix != null && !prefix.Equals("") && !prefix.Equals("aeron-spy")) { throw new ArgumentException("invalid prefix: " + prefix); } _prefix = prefix; return this; } public ChannelUriStringBuilder Prefix(ChannelUri channelUri) { return Prefix(channelUri.Prefix()); } public string Prefix() { return _prefix; } public ChannelUriStringBuilder Media(string media) { if (!(media == "udp") && !(media == "ipc")) { throw new ArgumentException("invalid media: " + media); } _media = media; return this; } public ChannelUriStringBuilder Media(ChannelUri channelUri) { return Media(channelUri.Media()); } public string Media() { return _media; } public ChannelUriStringBuilder Endpoint(string endpoint) { _endpoint = endpoint; return this; } public ChannelUriStringBuilder Endpoint(ChannelUri channelUri) { return Endpoint(channelUri.Get("endpoint")); } public string Endpoint() { return _endpoint; } public ChannelUriStringBuilder NetworkInterface(string networkInterface) { _networkInterface = networkInterface; return this; } public ChannelUriStringBuilder NetworkInterface(ChannelUri channelUri) { return NetworkInterface(channelUri.Get("interface")); } public string NetworkInterface() { return _networkInterface; } public ChannelUriStringBuilder ControlEndpoint(string controlEndpoint) { _controlEndpoint = controlEndpoint; return this; } public ChannelUriStringBuilder ControlEndpoint(ChannelUri channelUri) { return ControlEndpoint(channelUri.Get("control")); } public string ControlEndpoint() { return _controlEndpoint; } public ChannelUriStringBuilder ControlMode(string controlMode) { if (controlMode != null && !controlMode.Equals("manual") && !controlMode.Equals("dynamic")) { throw new ArgumentException("invalid control mode: " + controlMode); } _controlMode = controlMode; return this; } public ChannelUriStringBuilder ControlMode(ChannelUri channelUri) { return ControlMode(channelUri.Get("control-mode")); } public string ControlMode() { return _controlMode; } public ChannelUriStringBuilder Reliable(bool? isReliable) { _reliable = isReliable; return this; } public ChannelUriStringBuilder Reliable(ChannelUri channelUri) { string text = channelUri.Get("reliable"); if (text == null) { _reliable = null; return this; } return Reliable(Convert.ToBoolean(text)); } public bool? Reliable() { return _reliable; } public ChannelUriStringBuilder Ttl(int? ttl) { if (ttl.HasValue && (ttl < 0 || ttl > 255)) { int? num = ttl; throw new ArgumentException("TTL not in range 0-255: " + num); } _ttl = ttl; return this; } public ChannelUriStringBuilder Ttl(ChannelUri channelUri) { string text = channelUri.Get("ttl"); if (text == null) { _ttl = null; return this; } return Ttl(Convert.ToInt32(text)); } public int? Ttl() { return _ttl; } public ChannelUriStringBuilder Mtu(int? mtu) { if (mtu.HasValue) { if (mtu < 32 || mtu > 65504) { int? num = mtu; throw new ArgumentException("MTU not in range 32-65504: " + num); } if ((mtu & 0x1F) != 0) { int? num = mtu; throw new ArgumentException("MTU not a multiple of FRAME_ALIGNMENT: mtu=" + num); } } _mtu = mtu; return this; } public ChannelUriStringBuilder Mtu(ChannelUri channelUri) { string text = channelUri.Get("mtu"); if (text == null) { _mtu = null; return this; } long num = SystemUtil.ParseSize("mtu", text); if (num > int.MaxValue) { throw new InvalidOperationException("mtu " + num + " > " + int.MaxValue); } return Mtu((int)num); } public int? Mtu() { return _mtu; } public ChannelUriStringBuilder TermLength(int? termLength) { if (termLength.HasValue) { LogBufferDescriptor.CheckTermLength(termLength.Value); } _termLength = termLength; return this; } public ChannelUriStringBuilder TermLength(ChannelUri channelUri) { string text = channelUri.Get("term-length"); if (text == null) { _termLength = null; return this; } long num = SystemUtil.ParseSize("term-length", text); if (num > int.MaxValue) { string text2 = 1073741824.ToString(); int? termLength = _termLength; throw new InvalidOperationException("term length more than max length of " + text2 + ": length=" + termLength); } return TermLength((int)num); } public int? TermLength() { return _termLength; } public ChannelUriStringBuilder InitialTermId(int? initialTermId) { _initialTermId = initialTermId; return this; } public ChannelUriStringBuilder InitialTermId(ChannelUri channelUri) { string text = channelUri.Get("init-term-id"); if (text == null) { _initialTermId = null; return this; } return InitialTermId(Convert.ToInt32(text)); } public int? InitialTermId() { return _initialTermId; } public ChannelUriStringBuilder TermId(int? termId) { _termId = termId; return this; } public ChannelUriStringBuilder TermId(ChannelUri channelUri) { string text = channelUri.Get("term-id"); if (text == null) { _termId = null; return this; } return TermId(Convert.ToInt32(text)); } public int? TermId() { return _termId; } public ChannelUriStringBuilder TermOffset(int? termOffset) { if (termOffset.HasValue) { if (termOffset < 0 || termOffset > 1073741824) { int? num = termOffset; throw new ArgumentException("term offset not in range 0-1g: " + num); } if (0 != (termOffset & 0x1F)) { int? num = termOffset; throw new ArgumentException("term offset not multiple of FRAME_ALIGNMENT: " + num); } } _termOffset = termOffset; return this; } public ChannelUriStringBuilder TermOffset(ChannelUri channelUri) { string text = channelUri.Get("term-offset"); if (text == null) { _termOffset = null; return this; } return TermOffset(Convert.ToInt32(text)); } public int? TermOffset() { return _termOffset; } public ChannelUriStringBuilder SessionId(int? sessionId) { _sessionId = sessionId; return this; } public ChannelUriStringBuilder SessionId(string sessionIdStr) { if (sessionIdStr != null) { if (ChannelUri.IsTagged(sessionIdStr)) { TaggedSessionId(ChannelUri.GetTag(sessionIdStr)); } else { IsSessionIdTagged(isSessionIdTagged: false); SessionId(Convert.ToInt32(sessionIdStr)); } } else { SessionId((int?)null); } return this; } public ChannelUriStringBuilder TaggedSessionId(long? sessionId) { IsSessionIdTagged(isSessionIdTagged: true); _sessionId = sessionId; return this; } public ChannelUriStringBuilder SessionId(ChannelUri channelUri) { return SessionId(channelUri.Get("session-id")); } [Obsolete("this method will not correctly handle tagged sessionId values that are outside the range of")] public int? SessionId() { return (int?)_sessionId; } public ChannelUriStringBuilder Linger(long? lingerNs) { if (lingerNs.HasValue && lingerNs < 0) { long? num = lingerNs; throw new ArgumentException("linger value cannot be negative: " + num); } _linger = lingerNs; return this; } public ChannelUriStringBuilder Linger(ChannelUri channelUri) { string text = channelUri.Get("linger"); if (text == null) { _linger = null; return this; } return Linger(SystemUtil.ParseDuration("linger", text)); } public long? Linger() { return _linger; } public ChannelUriStringBuilder Sparse(bool? isSparse) { _sparse = isSparse; return this; } public ChannelUriStringBuilder Sparse(ChannelUri channelUri) { string text = channelUri.Get("sparse"); if (text == null) { _sparse = null; return this; } return Sparse(Convert.ToBoolean(text)); } public bool? Sparse() { return _sparse; } public ChannelUriStringBuilder Eos(bool? eos) { _eos = eos; return this; } public ChannelUriStringBuilder Eos(ChannelUri channelUri) { string text = channelUri.Get("eos"); if (text == null) { _eos = null; return this; } return Eos(Convert.ToBoolean(text)); } public bool? Eos() { return _eos; } public ChannelUriStringBuilder Tether(bool? tether) { _tether = tether; return this; } public ChannelUriStringBuilder Tether(ChannelUri channelUri) { string text = channelUri.Get("tether"); if (text == null) { _tether = null; return this; } return Tether(Convert.ToBoolean(text)); } public bool? Tether() { return _tether; } public ChannelUriStringBuilder Group(bool? group) { _group = group; return this; } public ChannelUriStringBuilder Group(ChannelUri channelUri) { string text = channelUri.Get("group"); if (text == null) { _group = null; return this; } return Group(Convert.ToBoolean(text)); } public bool? Group() { return _group; } public ChannelUriStringBuilder Tags(string tags) { _tags = tags; return this; } public ChannelUriStringBuilder Tags(ChannelUri channelUri) { return Tags(channelUri.Get("tags")); } public ChannelUriStringBuilder Tags(long? channelTag, long? pubSubTag) { if (!channelTag.HasValue && pubSubTag.HasValue) { throw new ArgumentException("null == channelTag && null != pubSubTag"); } if (!channelTag.HasValue) { return Tags((string)null); } long? num = channelTag; string? text = num.ToString(); object obj; if (!pubSubTag.HasValue) { obj = ""; } else { num = pubSubTag; obj = "," + num; } return Tags(text + (string?)obj); } public string Tags() { return _tags; } public ChannelUriStringBuilder IsSessionIdTagged(bool isSessionIdTagged) { _isSessionIdTagged = isSessionIdTagged; return this; } public bool IsSessionIdTagged() { return _isSessionIdTagged; } public ChannelUriStringBuilder Alias(string alias) { _alias = alias; return this; } public ChannelUriStringBuilder Alias(ChannelUri channelUri) { return Alias(channelUri.Get("alias")); } public string Alias() { return _alias; } public ChannelUriStringBuilder CongestionControl(string congestionControl) { _cc = congestionControl; return this; } public ChannelUriStringBuilder CongestionControl(ChannelUri channelUri) { return CongestionControl(channelUri.Get("cc")); } public string CongestionControl() { return _cc; } public ChannelUriStringBuilder FlowControl(string flowControl) { _fc = flowControl; return this; } public ChannelUriStringBuilder TaggedFlowControl(long? groupTag, int? minGroupSize, string timeout) { string text = "tagged"; if (groupTag.HasValue || minGroupSize.HasValue) { text += ",g:"; if (groupTag.HasValue) { string text2 = text; long? num = groupTag; text = text2 + num; } if (minGroupSize.HasValue) { string text3 = text; int? num2 = minGroupSize; text = text3 + "/" + num2; } } if (timeout != null) { text = text + ",t:" + timeout; } return FlowControl(text); } public ChannelUriStringBuilder MinFlowControl(int? minGroupSize, string timeout) { string text = "min"; if (minGroupSize.HasValue) { string text2 = text; int? num = minGroupSize; text = text2 + ",g:/" + num; } if (timeout != null) { text = text + ",t:" + timeout; } return FlowControl(text); } public ChannelUriStringBuilder FlowControl(ChannelUri channelUri) { return FlowControl(channelUri.Get("fc")); } public string FlowControl() { return _fc; } public ChannelUriStringBuilder GroupTag(long? groupTag) { _groupTag = groupTag; return this; } public ChannelUriStringBuilder GroupTag(ChannelUri channelUri) { string text = channelUri.Get("gtag"); if (text == null) { _groupTag = null; return this; } return GroupTag(Convert.ToInt64(text)); } public long? GroupTag() { return _groupTag; } public ChannelUriStringBuilder Rejoin(bool? rejoin) { _rejoin = rejoin; return this; } public ChannelUriStringBuilder Rejoin(ChannelUri channelUri) { string text = channelUri.Get("rejoin"); if (text == null) { _rejoin = null; return this; } return Rejoin(Convert.ToBoolean(text)); } public bool? Rejoin() { return _rejoin; } public ChannelUriStringBuilder SpiesSimulateConnection(bool? spiesSimulateConnection) { _ssc = spiesSimulateConnection; return this; } public ChannelUriStringBuilder SpiesSimulateConnection(ChannelUri channelUri) { string text = channelUri.Get("ssc"); if (text == null) { _ssc = null; return this; } return SpiesSimulateConnection(Convert.ToBoolean(text)); } public bool? SpiesSimulateConnection() { return _ssc; } public ChannelUriStringBuilder InitialPosition(long position, int initialTermId, int termLength) { if (position < 0) { throw new ArgumentException("invalid position=" + position + " < 0"); } if ((position & 0x1F) != 0L) { throw new ArgumentException("invalid position=" + position + " does not have frame alignment=" + 32); } int positionBitsToShift = LogBufferDescriptor.PositionBitsToShift(termLength); _initialTermId = initialTermId; _termId = LogBufferDescriptor.ComputeTermIdFromPosition(position, positionBitsToShift, initialTermId); _termOffset = (int)(position & (termLength - 1)); _termLength = termLength; return this; } public ChannelUriStringBuilder SocketSndbufLength(int? socketSndbufLength) { _socketSndbufLength = socketSndbufLength; return this; } public ChannelUriStringBuilder SocketSndbufLength(ChannelUri channelUri) { string text = channelUri.Get("so-sndbuf"); if (text == null) { _socketSndbufLength = null; return this; } long num = SystemUtil.ParseSize("so-sndbuf", text); if (num > int.MaxValue) { throw new InvalidOperationException("value exceeds maximum permitted: value=" + num); } return SocketSndbufLength((int)num); } public int? SocketSndbufLength() { return _socketSndbufLength; } public ChannelUriStringBuilder SocketRcvbufLength(int? socketRcvbufLength) { _socketRcvbufLength = socketRcvbufLength; return this; } public ChannelUriStringBuilder SocketRcvbufLength(ChannelUri channelUri) { string text = channelUri.Get("so-rcvbuf"); if (text == null) { _socketRcvbufLength = null; return this; } long num = SystemUtil.ParseSize("so-rcvbuf", text); if (num > int.MaxValue) { throw new InvalidOperationException("value exceeds maximum permitted: value=" + num); } return SocketRcvbufLength((int)num); } public int? SocketRcvbufLength() { return _socketRcvbufLength; } public ChannelUriStringBuilder ReceiverWindowLength(int? receiverWindowLength) { _receiverWindowLength = receiverWindowLength; return this; } public ChannelUriStringBuilder ReceiverWindowLength(ChannelUri channelUri) { string text = channelUri.Get("rcv-wnd"); if (text == null) { _receiverWindowLength = null; return this; } long num = SystemUtil.ParseSize("rcv-wnd", text); if (num > int.MaxValue) { throw new InvalidOperationException("value exceeds maximum permitted: value=" + num); } return ReceiverWindowLength((int)num); } public int? ReceiverWindowLength() { return _receiverWindowLength; } public string MediaReceiveTimestampOffset() { return _mediaReceiveTimestampOffset; } public ChannelUriStringBuilder MediaReceiveTimestampOffset(string timestampOffset) { if (timestampOffset != null && !"reserved".Equals(timestampOffset)) { try { int.Parse(timestampOffset); } catch (FormatException) { throw new ArgumentException("mediaReceiveTimestampOffset must be a number or the value 'reserved'"); } } _mediaReceiveTimestampOffset = timestampOffset; return this; } public ChannelUriStringBuilder MediaReceiveTimestampOffset(ChannelUri channelUri) { return MediaReceiveTimestampOffset(channelUri.Get("media-rcv-ts-offset")); } public string ChannelReceiveTimestampOffset() { return _channelReceiveTimestampOffset; } public ChannelUriStringBuilder ChannelReceiveTimestampOffset(string timestampOffset) { if (timestampOffset != null && !"reserved".Equals(timestampOffset)) { try { int.Parse(timestampOffset); } catch (FormatException) { throw new ArgumentException("channelReceiveTimestampOffset must be a number or the value 'reserved'"); } } _channelReceiveTimestampOffset = timestampOffset; return this; } public ChannelUriStringBuilder ChannelReceiveTimestampOffset(ChannelUri channelUri) { return ChannelReceiveTimestampOffset(channelUri.Get("channel-rcv-ts-offset")); } public string ChannelSendTimestampOffset() { return _channelSendTimestampOffset; } public ChannelUriStringBuilder ChannelSendTimestampOffset(string timestampOffset) { if (timestampOffset != null && !"reserved".Equals(timestampOffset)) { try { int.Parse(timestampOffset); } catch (FormatException) { throw new ArgumentException("channelSendTimestampOffset must be a number or the value 'reserved' found: " + timestampOffset); } } _channelSendTimestampOffset = timestampOffset; return this; } public ChannelUriStringBuilder ChannelSendTimestampOffset(ChannelUri channelUri) { return ChannelSendTimestampOffset(channelUri.Get("channel-snd-ts-offset")); } public string Build() { _sb.Length = 0; if (_prefix != null && !"".Equals(_prefix)) { _sb.Append(_prefix).Append(':'); } _sb.Append("aeron").Append(':').Append(_media) .Append('?'); if (_tags != null) { _sb.Append("tags").Append('=').Append(_tags) .Append('|'); } if (_endpoint != null) { _sb.Append("endpoint").Append('=').Append(_endpoint) .Append('|'); } if (_networkInterface != null) { _sb.Append("interface").Append('=').Append(_networkInterface) .Append('|'); } if (_controlEndpoint != null) { _sb.Append("control").Append('=').Append(_controlEndpoint) .Append('|'); } if (_controlMode != null) { _sb.Append("control-mode").Append('=').Append(_controlMode) .Append('|'); } if (_mtu.HasValue) { _sb.Append("mtu").Append('=').Append(_mtu.Value) .Append('|'); } if (_termLength.HasValue) { _sb.Append("term-length").Append('=').Append(_termLength.Value) .Append('|'); } if (_initialTermId.HasValue) { _sb.Append("init-term-id").Append('=').Append(_initialTermId.Value) .Append('|'); } if (_termId.HasValue) { _sb.Append("term-id").Append('=').Append(_termId.Value) .Append('|'); } if (_termOffset.HasValue) { _sb.Append("term-offset").Append('=').Append(_termOffset.Value) .Append('|'); } if (_sessionId.HasValue) { _sb.Append("session-id").Append('=').Append(PrefixTag(_isSessionIdTagged, _sessionId.Value)) .Append('|'); } if (_ttl.HasValue) { _sb.Append("ttl").Append('=').Append(_ttl.Value) .Append('|'); } if (_reliable.HasValue) { _sb.Append("reliable").Append('=').Append(_reliable) .Append('|'); } if (_linger.HasValue) { _sb.Append("linger").Append('=').Append(_linger.Value) .Append('|'); } if (_alias != null) { _sb.Append("alias").Append('=').Append(_alias) .Append('|'); } if (_cc != null) { _sb.Append("cc").Append('=').Append(_cc) .Append('|'); } if (_fc != null) { _sb.Append("fc").Append('=').Append(_fc) .Append('|'); } if (_groupTag.HasValue) { _sb.Append("gtag").Append('=').Append(_groupTag) .Append('|'); } if (_sparse.HasValue) { _sb.Append("sparse").Append('=').Append(_sparse) .Append('|'); } if (_eos.HasValue) { _sb.Append("eos").Append('=').Append(_eos) .Append('|'); } if (_tether.HasValue) { _sb.Append("tether").Append('=').Append(_tether) .Append('|'); } if (_group.HasValue) { _sb.Append("group").Append('=').Append(_group) .Append('|'); } if (_rejoin.HasValue) { _sb.Append("rejoin").Append('=').Append(_rejoin) .Append('|'); } if (_ssc.HasValue) { _sb.Append("ssc").Append('=').Append(_ssc) .Append('|'); } if (_socketSndbufLength.HasValue) { _sb.Append("so-sndbuf").Append('=').Append(_socketSndbufLength) .Append('|'); } if (_socketRcvbufLength.HasValue) { _sb.Append("so-rcvbuf").Append('=').Append(_socketRcvbufLength) .Append('|'); } if (_receiverWindowLength.HasValue) { _sb.Append("rcv-wnd").Append('=').Append(_receiverWindowLength) .Append('|'); } if (_mediaReceiveTimestampOffset != null) { _sb.Append("media-rcv-ts-offset").Append('=').Append(_mediaReceiveTimestampOffset) .Append('|'); } if (_channelReceiveTimestampOffset != null) { _sb.Append("channel-rcv-ts-offset").Append('=').Append(_channelReceiveTimestampOffset) .Append('|'); } if (_channelSendTimestampOffset != null) { _sb.Append("channel-snd-ts-offset").Append('=').Append(_channelSendTimestampOffset) .Append('|'); } char c = _sb[_sb.Length - 1]; if (c == '|' || c == '?') { _sb.Length -= 1; } return _sb.ToString(); } public override string ToString() { return Build(); } private static string PrefixTag(bool isTagged, long? value) { if (!isTagged) { return value.ToString(); } long? num = value; return "tag:" + num; } } internal class ClientConductor : IAgent { private const long NO_CORRELATION_ID = -1L; private static readonly long EXPLICIT_CLOSE_LINGER_NS = 1000000000L; private readonly long _keepAliveIntervalNs; private readonly long _driverTimeoutMs; private readonly long _driverTimeoutNs; private readonly long _interServiceTimeoutNs; private long _timeOfLastKeepAliveNs; private long _timeOfLastServiceNs; private bool _isClosed; private bool _isInCallback; private bool _isTerminating; private RegistrationException _driverException; private readonly Aeron.Context _ctx; private readonly Aeron _aeron; private readonly ILock _clientLock; private readonly IEpochClock _epochClock; private readonly INanoClock _nanoClock; private readonly IIdleStrategy _awaitingIdleStrategy; private readonly DriverEventsAdapter _driverEventsAdapter; private readonly ILogBuffersFactory _logBuffersFactory; private readonly Map<long, LogBuffers> _logBuffersByIdMap = new Map<long, LogBuffers>((LogBuffers)null); private readonly List<LogBuffers> _lingeringLogBuffers = new List<LogBuffers>(); private readonly Map<long, object> _resourceByRegIdMap = new Map<long, object>((object)null); private readonly Map<long, string> _stashedChannelByRegistrationId = new Map<long, string>((string)null); private readonly HashSet<long> _asyncCommandIdSet = new HashSet<long>(); private readonly AvailableImageHandler _defaultAvailableImageHandler; private readonly UnavailableImageHandler _defaultUnavailableImageHandler; private readonly Map<long, AvailableCounterHandler> _availableCounterHandlers = new Map<long, AvailableCounterHandler>((AvailableCounterHandler)null); private readonly Map<long, UnavailableCounterHandler> _unavailableCounterHandlers = new Map<long, UnavailableCounterHandler>((UnavailableCounterHandler)null); private readonly Map<long, Action> _closeHandlersByIdMap = new Map<long, Action>((Action)null); private readonly DriverProxy _driverProxy; private readonly AgentInvoker _driverAgentInvoker; private readonly UnsafeBuffer _counterValuesBuffer; private readonly CountersReader _countersReader; private AtomicCounter _heartbeatTimestamp; internal ClientConductor(Aeron.Context ctx, Aeron aeron) { //IL_0151: Unknown result type (might be due to invalid IL or missing references) //IL_015b: Expected O, but got Unknown _ctx = ctx; _aeron = aeron; _clientLock = ctx.ClientLock(); _epochClock = ctx.EpochClock(); _nanoClock = ctx.NanoClock(); _awaitingIdleStrategy = ctx.AwaitingIdleStrategy(); _driverProxy = ctx.DriverProxy(); _logBuffersFactory = ctx.LogBuffersFactory(); _keepAliveIntervalNs = ctx.KeepAliveIntervalNs(); _driverTimeoutMs = ctx.DriverTimeoutMs(); _driverTimeoutNs = _driverTimeoutMs * 1000000; _interServiceTimeoutNs = ctx.InterServiceTimeoutNs(); _defaultAvailableImageHandler = ctx.AvailableImageHandler(); _defaultUnavailableImageHandler = ctx.UnavailableImageHandler(); _driverEventsAdapter = new DriverEventsAdapter(ctx.ClientId(), ctx.ToClientBuffer(), this, _asyncCommandIdSet); _driverAgentInvoker = ctx.DriverAgentInvoker(); _counterValuesBuffer = ctx.CountersValuesBuffer(); _countersReader = new CountersReader((IAtomicBuffer)(object)ctx.CountersMetaDataBuffer(), (IAtomicBuffer)(object)ctx.CountersValuesBuffer(), Encoding.ASCII); if (ctx.AvailableCounterHandler() != null) { _availableCounterHandlers.Put(aeron.NextCorrelationId(), ctx.AvailableCounterHandler()); } if (ctx.UnavailableCounterHandler() != null) { _unavailableCounterHandlers.Put(aeron.NextCorrelationId(), ctx.UnavailableCounterHandler()); } if (ctx.CloseHandler() != null) { _closeHandlersByIdMap.Put(aeron.NextCorrelationId(), ctx.CloseHandler()); } _timeOfLastServiceNs = (_timeOfLastKeepAliveNs = _nanoClock.NanoTime()); } public void OnStart() { } public void OnClose() { bool flag = false; _clientLock.Lock(); try { if (_isClosed) { return; } if (!_aeron.IsClosed) { _aeron.InternalClose(); } bool isTerminating = _isTerminating; _isTerminating = true; ForceCloseResources(); NotifyCloseHandlers(); try { if (isTerminating) { Thread.Sleep(Aeron.Configuration.IdleSleepMs); } Thread.Sleep((int)TimeUnit.NANOSECONDS.ToMillis(_ctx.CloseLingerDurationNs())); } catch (ThreadInterruptedException) { flag = true; } foreach (LogBuffers lingeringLogBuffer in _lingeringLogBuffers) { CloseHelper.Dispose(_ctx.ErrorHandler(), (IDisposable)lingeringLogBuffer); } _driverProxy.ClientClose(); _ctx.Dispose(); _ctx.CountersMetaDataBuffer().Wrap(0, 0); _ctx.CountersValuesBuffer().Wrap(0, 0); } finally { _isClosed = true; if (flag) { Thread.CurrentThread.Interrupt(); } _clientLock.Unlock(); } } public int DoWork() { //IL_0017: Unknown result type (might be due to invalid IL or missing references) int result = 0; if (_clientLock.TryLock()) { try { if (_isTerminating) { throw new AgentTerminationException(); } result = Service(-1L); } finally { _clientLock.Unlock(); } } return result; } public string RoleName() { return "aeron-client-conductor"; } internal bool IsClosed() { return _isClosed; } internal bool IsTerminating() { return _isTerminating; } internal void OnError(long correlationId, int codeValue, ErrorCode errorCode, string message) { _driverException = new RegistrationException(correlationId, codeValue, errorCode, message); if (_resourceByRegIdMap.Get(correlationId) is Subscription subscription) { subscription.InternalClose(-1L); _resourceByRegIdMap.Remove(correlationId); } else if (_asyncCommandIdSet.Remove(correlationId)) { _stashedChannelByRegistrationId.Remove(correlationId); HandleError(new RegistrationException(correlationId, codeValue, errorCode, message)); } } internal void OnAsyncError(long correlationId, int codeValue, ErrorCode errorCode, string message) { _stashedChannelByRegistrationId.Remove(correlationId); HandleError(new RegistrationException(correlationId, codeValue, errorCode, message)); } internal void OnChannelEndpointError(long correlationId, string message) { int num = (int)correlationId; foreach (object value in _resourceByRegIdMap.Values) { if (value is Subscription subscription) { if (subscription.ChannelStatusId == num) { HandleError(new ChannelEndpointException(num, message)); } } else if (value is Publication publication && publication.ChannelStatusId == num) { HandleError(new ChannelEndpointException(num, message)); } } if (_asyncCommandIdSet.Remove(correlationId)) { _stashedChannelByRegistrationId.Remove(correlationId); HandleError(new RegistrationException(correlationId, 4, ErrorCode.CHANNEL_ENDPOINT_ERROR, message)); } } internal void OnNewPublication(long correlationId, long registrationId, int streamId, int sessionId, int publicationLimitId, int statusIndicatorId, string logFileName) { //IL_001a: Unknown result type (might be due to invalid IL or missing references) //IL_0032: Expected O, but got Unknown string channel = _stashedChannelByRegistrationId.Remove(correlationId); ConcurrentPublication concurrentPublication = new ConcurrentPublication(this, channel, streamId, sessionId, (IReadablePosition)new UnsafeBufferPosition(_counterValuesBuffer, publicationLimitId), statusIndicatorId, LogBuffers(registrationId, logFileName, channel), registrationId, correlationId); _resourceByRegIdMap.Put(correlationId, (object)concurrentPublication); } internal void OnNewExclusivePublication(long correlationId, long registrationId, int streamId, int sessionId, int publicationLimitId, int statusIndicatorId, string logFileName) { //IL_0046: Unknown result type (might be due to invalid IL or missing references) //IL_005e: Expected O, but got Unknown if (correlationId != registrationId) { HandleError(new InvalidOperationException("correlationId=" + correlationId + " registrationId=" + registrationId)); } string channel = _stashedChannelByRegistrationId.Remove(correlationId); ExclusivePublication exclusivePublication = new ExclusivePublication(this, channel, streamId, sessionId, (IReadablePosition)new UnsafeBufferPosition(_counterValuesBuffer, publicationLimitId), statusIndicatorId, LogBuffers(registrationId, logFileName, channel), registrationId, correlationId); _resourceByRegIdMap.Put(correlationId, (object)exclusivePublication); } internal void OnNewSubscription(long correlationId, int statusIndicatorId) { ((Subscription)_resourceByRegIdMap.Get(correlationId)).ChannelStatusId = statusIndicatorId; } internal void OnAvailableImage(long correlationId, int sessionId, long subscriptionRegistrationId, int subscriberPositionId, string logFileName, string sourceIdentity) { //IL_001f: Unknown result type (might be due to invalid IL or missing references) //IL_0046: Expected O, but got Unknown Subscription subscription = (Subscription)_resourceByRegIdMap.Get(subscriptionRegistrationId); if (subscription == null) { return; } Image image = new Image(subscription, sessionId, (IPosition)new UnsafeBufferPosition(_counterValuesBuffer, subscriberPositionId), LogBuffers(correlationId, logFileName, subscription.Channel), _ctx.SubscriberErrorHandler(), sourceIdentity, correlationId); AvailableImageHandler availableImageHandler = subscription.AvailableImageHandler; if (availableImageHandler != null) { _isInCallback = true; try { availableImageHandler(image); } catch (Exception ex) { HandleError(ex); } finally { _isInCallback = false; } } subscription.AddImage(image); } internal void OnUnavailableImage(long correlationId, long subscriptionRegistrationId) { Subscription subscription = (Subscription)_resourceByRegIdMap.Get(subscriptionRegistrationId); if (subscription == null) { return; } Image image = subscription.RemoveImage(correlationId); if (image != null) { UnavailableImageHandler unavailableImageHandler = subscription.UnavailableImageHandler; if (unavailableImageHandler != null) { NotifyImageUnavailable(unavailableImageHandler, image); } } } internal void OnNewCounter(long correlationId, int counterId) { _resourceByRegIdMap.Put(correlationId, (object)new Counter(correlationId, this, (IAtomicBuffer)(object)_counterValuesBuffer, counterId)); OnAvailableCounter(correlationId, counterId); } internal void OnAvailableCounter(long registrationId, int counterId) { foreach (AvailableCounterHandler value in _availableCounterHandlers.Values) { NotifyCounterAvailable(registrationId, counterId, value); } } internal void OnUnavailableCounter(long registrationId, int counterId) { NotifyUnavailableCounterHandlers(registrationId, counterId); } internal void OnClientTimeout() { if (!_isClosed) { _isTerminating = true; ForceCloseResources(); HandleError(new ClientTimeoutException("client timeout from driver")); } } internal CountersReader CountersReader() { return _countersReader; } internal void HandleError(Exception ex) { if (!_isClosed) { _ctx.ErrorHandler().Invoke(ex); } } internal ConcurrentPublication AddPublication(string channel, int streamId) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); long num = _driverProxy.AddPublication(channel, streamId); _stashedChannelByRegistrationId.Put(num, channel); AwaitResponse(num); return (ConcurrentPublication)_resourceByRegIdMap.Get(num); } finally { _clientLock.Unlock(); } } internal ExclusivePublication AddExclusivePublication(string channel, int streamId) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); long num = _driverProxy.AddExclusivePublication(channel, streamId); _stashedChannelByRegistrationId.Put(num, channel); AwaitResponse(num); return (ExclusivePublication)_resourceByRegIdMap.Get(num); } finally { _clientLock.Unlock(); } } internal long AsyncAddPublication(string channel, int streamId) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); long num = _driverProxy.AddPublication(channel, streamId); _stashedChannelByRegistrationId.Put(num, channel); _asyncCommandIdSet.Add(num); return num; } finally { _clientLock.Unlock(); } } internal long AsyncAddExclusivePublication(string channel, int streamId) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); long num = _driverProxy.AddExclusivePublication(channel, streamId); _stashedChannelByRegistrationId.Put(num, channel); _asyncCommandIdSet.Add(num); return num; } finally { _clientLock.Unlock(); } } internal ConcurrentPublication GetPublication(long registrationId) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); if (_asyncCommandIdSet.Contains(registrationId)) { Service(-1L); } return (ConcurrentPublication)_resourceByRegIdMap.Get(registrationId); } finally { _clientLock.Unlock(); } } internal ExclusivePublication GetExclusivePublication(long registrationId) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); if (_asyncCommandIdSet.Contains(registrationId)) { Service(-1L); } return (ExclusivePublication)_resourceByRegIdMap.Get(registrationId); } finally { _clientLock.Unlock(); } } internal void RemovePublication(Publication publication) { _clientLock.Lock(); try { if (!_isTerminating && !_isClosed && !publication.IsClosed) { EnsureNotReentrant(); publication.InternalClose(); if (publication == _resourceByRegIdMap.Remove(publication.RegistrationId)) { ReleaseLogBuffers(publication.LogBuffers, publication.OriginalRegistrationId, EXPLICIT_CLOSE_LINGER_NS); _asyncCommandIdSet.Add(_driverProxy.RemovePublication(publication.RegistrationId)); } } } finally { _clientLock.Unlock(); } } internal void RemovePublication(long publicationRegistrationId) { _clientLock.Lock(); try { if (-1 != publicationRegistrationId && !_isTerminating && !_isClosed) { EnsureNotReentrant(); object obj = _resourceByRegIdMap.Get(publicationRegistrationId); if (obj != null && !(obj is Publication)) { throw new AeronException("registration id is not a Publication"); } Publication publication = (Publication)obj; if (publication != null) { _resourceByRegIdMap.Remove(publicationRegistrationId); publication.InternalClose(); ReleaseLogBuffers(publication.LogBuffers, publication.OriginalRegistrationId, EXPLICIT_CLOSE_LINGER_NS); } if (_asyncCommandIdSet.Remove(publicationRegistrationId) || publication != null) { _driverProxy.RemovePublication(publicationRegistrationId); _stashedChannelByRegistrationId.Remove(publicationRegistrationId); } } } finally { _clientLock.Unlock(); } } internal Subscription AddSubscription(string channel, int streamId) { return AddSubscription(channel, streamId, _defaultAvailableImageHandler, _defaultUnavailableImageHandler); } internal Subscription AddSubscription(string channel, int streamId, AvailableImageHandler availableImageHandler, UnavailableImageHandler unavailableImageHandler) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); long num = _driverProxy.AddSubscription(channel, streamId); Subscription subscription = new Subscription(this, channel, streamId, num, availableImageHandler, unavailableImageHandler); _resourceByRegIdMap.Put(num, (object)subscription); AwaitResponse(num); return subscription; } finally { _clientLock.Unlock(); } } internal void RemoveSubscription(Subscription subscription) { _clientLock.Lock(); try { if (!_isTerminating && !_isClosed && !subscription.IsClosed) { EnsureNotReentrant(); subscription.InternalClose(EXPLICIT_CLOSE_LINGER_NS); long registrationId = subscription.RegistrationId; if (subscription == _resourceByRegIdMap.Remove(registrationId)) { _asyncCommandIdSet.Add(_driverProxy.RemoveSubscription(registrationId)); } } } finally { _clientLock.Unlock(); } } internal void AddDestination(long registrationId, string endpointChannel) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); AwaitResponse(_driverProxy.AddDestination(registrationId, endpointChannel)); } finally { _clientLock.Unlock(); } } internal void RemoveDestination(long registrationId, string endpointChannel) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); AwaitResponse(_driverProxy.RemoveDestination(registrationId, endpointChannel)); } finally { _clientLock.Unlock(); } } internal void AddRcvDestination(long registrationId, string endpointChannel) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); AwaitResponse(_driverProxy.AddRcvDestination(registrationId, endpointChannel)); } finally { _clientLock.Unlock(); } } internal void RemoveRcvDestination(long registrationId, string endpointChannel) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); AwaitResponse(_driverProxy.RemoveRcvDestination(registrationId, endpointChannel)); } finally { _clientLock.Unlock(); } } internal long AsyncAddDestination(long registrationId, string endpointChannel) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); long num = _driverProxy.AddDestination(registrationId, endpointChannel); _asyncCommandIdSet.Add(num); return num; } finally { _clientLock.Unlock(); } } internal long AsyncRemoveDestination(long registrationId, string endpointChannel) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); long num = _driverProxy.RemoveDestination(registrationId, endpointChannel); _asyncCommandIdSet.Add(num); return num; } finally { _clientLock.Unlock(); } } internal long AsyncAddRcvDestination(long registrationId, string endpointChannel) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); long num = _driverProxy.AddRcvDestination(registrationId, endpointChannel); _asyncCommandIdSet.Add(num); return num; } finally { _clientLock.Unlock(); } } internal long AsyncRemoveRcvDestination(long registrationId, string endpointChannel) { _clientLock.Lock(); try { EnsureActive(); EnsureNotReentrant(); long num = _driverProxy.RemoveRcvDestination(registrationId, endpointChan