Decompiled source of ULTRAKILLMessedUpSettings v1.0.0
WebTestMQTTVersionHost.dll
Decompiled 3 days agousing System; using System.Collections; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; using System.Net.Sockets; using System.Reflection; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Runtime.Versioning; using System.Security; using System.Security.Cryptography; using System.Security.Permissions; using System.Text; using System.Threading; using System.Threading.Tasks; using BepInEx; using BepInEx.Configuration; using BepInEx.Logging; using Configgy; using HarmonyLib; using MQTTnet; using MQTTnet.Client; using MQTTnet.Packets; using MQTTnet.Protocol; using Newtonsoft.Json; using TMPro; using UnityEngine; using UnityEngine.AddressableAssets; using UnityEngine.EventSystems; using UnityEngine.Events; using UnityEngine.InputSystem; using UnityEngine.InputSystem.Utilities; using UnityEngine.Networking; using UnityEngine.ResourceManagement.AsyncOperations; using UnityEngine.SceneManagement; using UnityEngine.UI; [assembly: CompilationRelaxations(8)] [assembly: RuntimeCompatibility(WrapNonExceptionThrows = true)] [assembly: Debuggable(DebuggableAttribute.DebuggingModes.Default | DebuggableAttribute.DebuggingModes.DisableOptimizations | DebuggableAttribute.DebuggingModes.IgnoreSymbolStoreSequencePoints | DebuggableAttribute.DebuggingModes.EnableEditAndContinue)] [assembly: AssemblyTitle("WebTestMQTTVersionHost")] [assembly: AssemblyDescription("")] [assembly: AssemblyConfiguration("")] [assembly: AssemblyCompany("")] [assembly: AssemblyProduct("WebTestMQTTVersionHost")] [assembly: AssemblyCopyright("Copyright © 2023")] [assembly: AssemblyTrademark("")] [assembly: ComVisible(false)] [assembly: Guid("7eb06f5e-d765-4aa9-b273-c87ab21b0de2")] [assembly: AssemblyFileVersion("1.0.0.0")] [assembly: TargetFramework(".NETFramework,Version=v4.8", FrameworkDisplayName = ".NET Framework 4.8")] [assembly: SecurityPermission(SecurityAction.RequestMinimum, SkipVerification = true)] [assembly: AssemblyVersion("1.0.0.0")] [module: UnverifiableCode] public class JsonBindingMQTT { public string path; public bool isComposite; public string[] compositePath; } public class Binding { public string Action { get; set; } public string Id { get; set; } public string Path { get; set; } public string Interactions { get; set; } public string Processors { get; set; } } public class Root { public List<Binding> Bindings { get; set; } } namespace WebTestMQTTVersionHost; internal class FileHandler { public static byte[] Base64ToFile(string base64String) { return Convert.FromBase64String(base64String); } public static void ProcessFile(byte[] fileData, string fileName) { WebTestMQTTVersionHostPlugin.Log.LogInfo((object)"Processing file..."); string text = Path.GetExtension(fileName).ToLower(); switch (text) { case ".png": WebTestMQTTVersionHostPlugin.Log.LogInfo((object)"It's an image"); HandlePNG(fileData); break; case ".wav": case ".ogg": WebTestMQTTVersionHostPlugin.Log.LogInfo((object)"It's an audio file"); HandleAudio(fileData, text); break; default: WebTestMQTTVersionHostPlugin.Log.LogWarning((object)("Unsupported file type: " + text)); break; } } private static void HandlePNG(byte[] fileData) { WebTestMQTTVersionHostPlugin.Log.LogInfo((object)"Converted image to base64"); ((MonoBehaviour)MonoSingleton<OptionsMenuToManager>.Instance).StartCoroutine(LoadImage(fileData)); } private static IEnumerator LoadImage(byte[] imageBytes) { yield return (object)new WaitForSeconds(0.1f); if (imageBytes == null || imageBytes.Length == 0) { WebTestMQTTVersionHostPlugin.Log.LogError((object)"Image byte array is null or empty."); yield break; } WebTestMQTTVersionHostPlugin.Log.LogInfo((object)"Attempting to decode base64 and load image..."); Texture2D texture = new Texture2D(2, 2); try { if (!ImageConversion.LoadImage(texture, imageBytes)) { WebTestMQTTVersionHostPlugin.Log.LogError((object)"LoadImage failed to decode byte array."); yield break; } WebTestMQTTVersionHostPlugin.Log.LogInfo((object)$"Image loaded successfully. Dimensions: {((Texture)texture).width}x{((Texture)texture).height}"); Sprite sprite = Sprite.Create(texture, new Rect(0f, 0f, (float)((Texture)texture).width, (float)((Texture)texture).height), new Vector2(0.5f, 0.5f)); WebTestMQTTVersionHostPlugin.Log.LogInfo((object)"Sprite created."); OptionsMenuToManager manager = MonoSingleton<OptionsMenuToManager>.Instance; if ((Object)(object)manager != (Object)null) { GameObject go = new GameObject("Image"); go.transform.SetParent(((Component)manager).transform, false); Image img = go.AddComponent<Image>(); img.sprite = sprite; go.transform.localPosition = Vector3.zero; WebTestMQTTVersionHostPlugin.Log.LogInfo((object)"Image added to screen."); } else { WebTestMQTTVersionHostPlugin.Log.LogError((object)"OptionsMenuToManager instance is null."); } } catch (Exception ex2) { Exception ex = ex2; WebTestMQTTVersionHostPlugin.Log.LogError((object)("Exception during image loading: " + ex.Message)); } yield return null; } private static void HandleAudio(byte[] fileData, string fileExtension) { string tempPath = Path.Combine(Application.temporaryCachePath, Guid.NewGuid().ToString() + fileExtension); try { File.WriteAllBytes(tempPath, fileData); ((MonoBehaviour)MonoSingleton<OptionsMenuToManager>.Instance).StartCoroutine(LoadAudioClip("file://" + tempPath, (AudioType)((fileExtension == ".wav") ? 20 : 14), delegate(AudioClip audioClip) { //IL_0012: Unknown result type (might be due to invalid IL or missing references) //IL_0018: Expected O, but got Unknown if ((Object)(object)audioClip != (Object)null) { GameObject val = new GameObject("AudioPlayer"); AudioSource val2 = val.AddComponent<AudioSource>(); val2.clip = audioClip; val2.Play(); WebTestMQTTVersionHostPlugin.Log.LogInfo((object)"Audio is playing!"); ((MonoBehaviour)MonoSingleton<OptionsMenuToManager>.Instance).StartCoroutine(CleanupAfterAudio(val, audioClip)); } else { WebTestMQTTVersionHostPlugin.Log.LogError((object)"AudioClip is null!"); } ((MonoBehaviour)MonoSingleton<OptionsMenuToManager>.Instance).StartCoroutine(DeleteTempFileAfterDelay(tempPath)); })); } catch (Exception ex) { WebTestMQTTVersionHostPlugin.Log.LogError((object)("Failed to process audio file: " + ex.Message)); } } private static IEnumerator LoadAudioClip(string path, AudioType type, Action<AudioClip> onLoaded) { //IL_000e: Unknown result type (might be due to invalid IL or missing references) //IL_000f: Unknown result type (might be due to invalid IL or missing references) UnityWebRequest www = UnityWebRequestMultimedia.GetAudioClip(path, type); try { yield return www.SendWebRequest(); if (www.isNetworkError || www.isHttpError) { WebTestMQTTVersionHostPlugin.Log.LogError((object)("Failed to load audio: " + www.error)); onLoaded?.Invoke(null); } else { AudioClip clip = DownloadHandlerAudioClip.GetContent(www); onLoaded?.Invoke(clip); } } finally { ((IDisposable)www)?.Dispose(); } } private static IEnumerator DeleteTempFileAfterDelay(string path) { yield return (object)new WaitForSeconds(1f); if (File.Exists(path)) { File.Delete(path); } } private static IEnumerator CleanupAfterAudio(GameObject audioPlayer, AudioClip audioClip) { yield return (object)new WaitUntil((Func<bool>)(() => !audioPlayer.GetComponent<AudioSource>().isPlaying)); if ((Object)(object)audioClip != (Object)null) { Object.Destroy((Object)(object)audioClip); } if ((Object)(object)audioPlayer != (Object)null) { Object.Destroy((Object)(object)audioPlayer); } WebTestMQTTVersionHostPlugin.Log.LogInfo((object)"Audio cleanup completed."); } } internal class JustATestLogger : MonoBehaviour { private InputManager inman; [HideInInspector] public OptionsManager opm; public List<ActionDisplayConfig> actionConfig; private void Awake() { inman = MonoSingleton<InputManager>.Instance; opm = MonoSingleton<OptionsManager>.Instance; } private void Start() { bool @bool = MonoSingleton<PrefsManager>.Instance.GetBool("scrollVariations", false); bool bool2 = MonoSingleton<PrefsManager>.Instance.GetBool("scrollWeapons", false); } public void ChangeKey(InputAction action, JsonBindingMQTT path, InputControlScheme scheme) { //IL_0007: Unknown result type (might be due to invalid IL or missing references) //IL_0008: Unknown result type (might be due to invalid IL or missing references) //IL_00c3: Unknown result type (might be due to invalid IL or missing references) //IL_00d9: Unknown result type (might be due to invalid IL or missing references) //IL_00de: Unknown result type (might be due to invalid IL or missing references) //IL_0092: Unknown result type (might be due to invalid IL or missing references) //IL_012b: Unknown result type (might be due to invalid IL or missing references) //IL_0130: Unknown result type (might be due to invalid IL or missing references) //IL_0134: Unknown result type (might be due to invalid IL or missing references) //IL_0139: Unknown result type (might be due to invalid IL or missing references) //IL_0378: Unknown result type (might be due to invalid IL or missing references) //IL_037d: Unknown result type (might be due to invalid IL or missing references) //IL_0382: Unknown result type (might be due to invalid IL or missing references) //IL_0391: Unknown result type (might be due to invalid IL or missing references) //IL_0142: Unknown result type (might be due to invalid IL or missing references) //IL_0147: Unknown result type (might be due to invalid IL or missing references) //IL_02c0: Unknown result type (might be due to invalid IL or missing references) //IL_02c5: Unknown result type (might be due to invalid IL or missing references) //IL_032d: Unknown result type (might be due to invalid IL or missing references) //IL_0333: Unknown result type (might be due to invalid IL or missing references) //IL_0335: Unknown result type (might be due to invalid IL or missing references) //IL_033a: Unknown result type (might be due to invalid IL or missing references) //IL_01ca: Unknown result type (might be due to invalid IL or missing references) //IL_01cc: Unknown result type (might be due to invalid IL or missing references) //IL_01d1: Unknown result type (might be due to invalid IL or missing references) //IL_01d7: Unknown result type (might be due to invalid IL or missing references) //IL_0301: Unknown result type (might be due to invalid IL or missing references) if (action == null || path == null) { Debug.LogError((object)"ChangeKey: One or more required parameters is null"); return; } Debug.Log((object)(action.name + " binding path: " + path.path)); InputManager instance = MonoSingleton<InputManager>.Instance; if ((Object)(object)instance == (Object)null) { Debug.LogError((object)"InputManager instance is null"); return; } try { PlayerInput inputSource = instance.InputSource; object obj; if (inputSource == null) { obj = null; } else { InputActions actions = inputSource.Actions; if (actions == null) { obj = null; } else { InputActionAsset asset = actions.asset; obj = ((asset != null) ? ((IEnumerable<InputActionMap>)(object)asset.actionMaps).SelectMany((InputActionMap map) => (IEnumerable<InputAction>)(object)map.actions) : null); } } IEnumerable<InputAction> enumerable = (IEnumerable<InputAction>)obj; InputBinding val = ((IEnumerable<InputBinding>)(object)action.bindings).FirstOrDefault((Func<InputBinding, bool>)((InputBinding b) => ((InputBinding)(ref b)).groups == ((InputControlScheme)(ref scheme)).bindingGroup)); string effectivePath = ((InputBinding)(ref val)).effectivePath; BindingSyntax val2; if (enumerable != null && effectivePath != null) { foreach (InputAction item in enumerable) { if (item == action) { continue; } Enumerator<InputBinding> enumerator2 = item.bindings.GetEnumerator(); try { while (enumerator2.MoveNext()) { InputBinding current2 = enumerator2.Current; if (((InputBinding)(ref current2)).groups == ((InputControlScheme)(ref scheme)).bindingGroup && ((InputBinding)(ref current2)).effectivePath == path.path) { Debug.Log((object)("Conflicting binding detected: " + item.name + " -> " + path.path + ". Updating to " + effectivePath + ".")); val2 = InputActionSetupExtensions.ChangeBinding(item, current2); ((BindingSyntax)(ref val2)).WithPath(effectivePath); } } } finally { ((IDisposable)enumerator2).Dispose(); } } } PlayerInput inputSource2 = instance.InputSource; if (inputSource2 != null) { inputSource2.Disable(); } instance.anyButtonListener?.Dispose(); InputExtensions.WipeAction(action, ((InputControlScheme)(ref scheme)).bindingGroup); if (path.isComposite && path.compositePath != null && path.compositePath.Length != 0) { if (!(action.expectedControlType == "Vector2")) { Debug.LogError((object)("Attempted to create composite binding for unsupported control type: '" + action.expectedControlType + "'")); return; } string[] array = new string[4] { "Up", "Down", "Left", "Right" }; CompositeSyntax val3 = InputActionSetupExtensions.AddCompositeBinding(action, "2DVector", (string)null, (string)null); for (int i = 0; i < Mathf.Min(path.compositePath.Length, array.Length); i++) { if (!string.IsNullOrEmpty(path.compositePath[i])) { ((CompositeSyntax)(ref val3)).With(array[i], path.compositePath[i], ((InputControlScheme)(ref scheme)).bindingGroup, (string)null); } } val = default(InputBinding); val2 = InputActionSetupExtensions.AddBinding(action, val); ((BindingSyntax)(ref val2)).Erase(); } else { val2 = InputActionSetupExtensions.AddBinding(action, new InputBinding(path.path, (string)null, (string)null, (string)null, (string)null, (string)null)); ((BindingSyntax)(ref val2)).WithGroup(((InputControlScheme)(ref scheme)).bindingGroup); } PlayerInput inputSource3 = instance.InputSource; object obj2; if (inputSource3 == null) { obj2 = null; } else { InputActions actions2 = inputSource3.Actions; obj2 = ((actions2 != null) ? actions2.asset : null); } if ((Object)obj2 != (Object)null) { instance.SaveBindings(instance.InputSource.Actions.asset); } FileInfo savedBindingsFile = instance.savedBindingsFile; if (savedBindingsFile != null && savedBindingsFile.Exists) { try { JsonBindingMap val4 = JsonConvert.DeserializeObject<JsonBindingMap>(File.ReadAllText(instance.savedBindingsFile.FullName)); if (val4 != null) { val4.ApplyTo(instance.InputSource.Actions.asset); } } catch (Exception ex) { Debug.LogError((object)("Failed to deserialize or apply binding map: " + ex.Message)); } } instance.actionModified?.Invoke(action); } catch (Exception ex2) { Debug.LogError((object)("Error in ChangeKey: " + ex2.Message)); } finally { PlayerInput inputSource4 = instance.InputSource; if (inputSource4 != null) { inputSource4.Enable(); } instance.anyButtonListener = InputManager.onAnyInput.Subscribe((IObserver<InputControl>)ButtonPressListener.Instance); } } private void LateUpdate() { } public void ScrollOn(bool stuff) { if ((Object)(object)inman == (Object)null) { inman = MonoSingleton<InputManager>.Instance; } if (stuff) { MonoSingleton<PrefsManager>.Instance.SetBool("scrollEnabled", true); inman.ScrOn = true; } else { MonoSingleton<PrefsManager>.Instance.SetBool("scrollEnabled", false); inman.ScrOn = false; } } public void ScrollVariations(int stuff) { if ((Object)(object)inman == (Object)null) { inman = MonoSingleton<InputManager>.Instance; } switch (stuff) { case 0: MonoSingleton<PrefsManager>.Instance.SetBool("scrollWeapons", true); MonoSingleton<PrefsManager>.Instance.SetBool("scrollVariations", false); inman.ScrWep = true; inman.ScrVar = false; break; case 1: MonoSingleton<PrefsManager>.Instance.SetBool("scrollWeapons", false); MonoSingleton<PrefsManager>.Instance.SetBool("scrollVariations", true); inman.ScrWep = false; inman.ScrVar = true; break; default: MonoSingleton<PrefsManager>.Instance.SetBool("scrollWeapons", true); MonoSingleton<PrefsManager>.Instance.SetBool("scrollVariations", true); inman.ScrWep = true; inman.ScrVar = true; break; } } public void ScrollReverse(bool stuff) { if ((Object)(object)inman == (Object)null) { inman = MonoSingleton<InputManager>.Instance; } if (stuff) { MonoSingleton<PrefsManager>.Instance.SetBool("scrollReversed", true); inman.ScrRev = true; } else { MonoSingleton<PrefsManager>.Instance.SetBool("scrollReversed", false); inman.ScrRev = false; } } } internal class MessageHandler : MonoBehaviour { public void HandleMessage(MessageDataJson messageData) { if (messageData.Value == "infinite") { messageData.Value = float.PositiveInfinity.ToString(); } else if (messageData.Value == "-infinite") { messageData.Value = float.NegativeInfinity.ToString(); } WebTestMQTTVersionHostPlugin.Log.LogInfo((object)"Changing shit"); if (messageData.Type == "text") { WebTestMQTTVersionHostPlugin.Log.LogInfo((object)"Changing text"); ((MonoBehaviour)this).StartCoroutine(waitalil(messageData)); } if (messageData.Type == "setting") { MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has changed " + messageData.Message + " to " + messageData.Value, "", "", 0, false); WebTestMQTTVersionHostPlugin.Log.LogInfo((object)"Changing setting"); HandleSettingData(messageData); } if (messageData.Type == "graphics") { MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has changed " + messageData.Message + " to " + messageData.Value, "", "", 0, false); WebTestMQTTVersionHostPlugin.Log.LogInfo((object)"Changing graphics"); WebTestMQTTVersionHostPlugin.Log.LogInfo((object)(((Object)(object)MonoSingleton<GraphicsOptions>.Instance == (Object)null) ? "fucking" : "work")); DebugGraphicsOptionsStatus(); WebTestMQTTVersionHostPlugin.Log.LogInfo((object)"graphicssssss"); InvokeGraphicsOptionsMethod(messageData.Message, messageData.Value); } if (messageData.Type == "file") { WebTestMQTTVersionHostPlugin.Log.LogInfo((object)"It is a file"); HandleFileData(messageData); } if (messageData.Type == "Control") { WebTestMQTTVersionHostPlugin.Log.LogInfo((object)"Changing control"); HandleControlSettingData(messageData); } if (messageData.Type == "WeaponToggle") { MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has put weapon " + messageData.Message + " to " + messageData.Value, "", "", 0, false); ChangeWeapon(messageData); } } public void ChangeWeapon(MessageDataJson messageData) { bool flag = bool.Parse(messageData.Value); MonoSingleton<PrefsManager>.Instance.SetInt("weapon." + messageData.Message, flag ? 1 : 0); ((MonoBehaviour)this).StartCoroutine(WaitALil()); } private IEnumerator WaitALil() { yield return (object)new WaitForSeconds(0.1f); MonoSingleton<GunSetter>.Instance.ResetWeapons(false); } public void HandleControlSettingData(MessageDataJson controlData) { //IL_00b2: Unknown result type (might be due to invalid IL or missing references) //IL_00b7: Unknown result type (might be due to invalid IL or missing references) //IL_00bb: Unknown result type (might be due to invalid IL or missing references) //IL_00c0: Unknown result type (might be due to invalid IL or missing references) //IL_00c6: Unknown result type (might be due to invalid IL or missing references) //IL_00cb: Unknown result type (might be due to invalid IL or missing references) MonoSingleton<HudMessageReceiver>.Instance.SendHudMessage("Someone set " + controlData.Message + " to " + controlData.Value, "", "", 0, false); InputManager instance = MonoSingleton<InputManager>.Instance; if ((Object)(object)instance == (Object)null) { Debug.LogError((object)"Input Manager is null!"); return; } PlayerInput inputSource = instance.InputSource; if (inputSource == null) { Debug.LogError((object)"Input Source is null!"); return; } InputActionState jump = inputSource.Jump; InputAction val = ((jump != null) ? jump.Action : null); if (val == null) { Debug.LogError((object)"Jump Action is null!"); return; } Debug.Log((object)"Current Jump Action Bindings:"); Enumerator<InputBinding> enumerator = val.bindings.GetEnumerator(); try { while (enumerator.MoveNext()) { InputBinding current = enumerator.Current; Debug.Log((object)("Path: " + ((InputBinding)(ref current)).path + ", Groups: " + ((InputBinding)(ref current)).groups)); } } finally { ((IDisposable)enumerator).Dispose(); } WebTestMQTTVersionHostPlugin.Path = JsonConvert.DeserializeObject<JsonBindingMQTT>(controlData.Value); WebTestMQTTVersionHostPlugin.Name = controlData.Message; WebTestMQTTVersionHostPlugin.LaunchLoggerthing = true; } public void HandleSettingData(MessageDataJson settingData) { try { Type type = ((object)MonoSingleton<OptionsMenuToManager>.Instance).GetType(); WebTestMQTTVersionHostPlugin.Log.LogInfo((object)("Retrieved runtime type: " + type.Name)); OptionsMenuToManager instance = MonoSingleton<OptionsMenuToManager>.Instance; if ((Object)(object)instance == (Object)null) { WebTestMQTTVersionHostPlugin.Log.LogError((object)"OptionsMenuToManager instance is null."); return; } WebTestMQTTVersionHostPlugin.Log.LogInfo((object)"Successfully retrieved OptionsMenuToManager instance."); FieldInfo field = type.GetField(settingData.Message, BindingFlags.Instance | BindingFlags.Static | BindingFlags.Public | BindingFlags.NonPublic); PropertyInfo property = type.GetProperty(settingData.Message, BindingFlags.Instance | BindingFlags.Static | BindingFlags.Public | BindingFlags.NonPublic); if (field == null && property == null) { WebTestMQTTVersionHostPlugin.Log.LogError((object)("Field or Property '" + settingData.Message + "' not found in OptionsMenuToManager!")); return; } object obj = ((field != null) ? field.GetValue(instance) : property.GetValue(instance)); if (obj == null) { WebTestMQTTVersionHostPlugin.Log.LogError((object)("Field or Property '" + settingData.Message + "' is null!")); return; } try { Slider val = (Slider)((obj is Slider) ? obj : null); if (val != null) { if (float.TryParse(settingData.Value, out var result)) { if (val.maxValue < result) { val.maxValue = result; } if (val.minValue > result) { val.minValue = result; } val.value = result; ((UnityEvent<float>)(object)val.onValueChanged).Invoke(result); } else { WebTestMQTTVersionHostPlugin.Log.LogError((object)("Invalid value '" + settingData.Value + "' for Slider '" + settingData.Message + "'")); } } else { TMP_Dropdown val2 = (TMP_Dropdown)((obj is TMP_Dropdown) ? obj : null); if (val2 != null) { if (int.TryParse(settingData.Value, out var result2)) { val2.value = result2; ((UnityEvent<int>)(object)val2.onValueChanged).Invoke(result2); } else { WebTestMQTTVersionHostPlugin.Log.LogError((object)("Invalid value '" + settingData.Value + "' for TMP_Dropdown '" + settingData.Message + "'")); } } else { Toggle val3 = (Toggle)((obj is Toggle) ? obj : null); if (val3 != null) { if (bool.TryParse(settingData.Value, out var result3)) { val3.isOn = result3; ((UnityEvent<bool>)(object)val3.onValueChanged).Invoke(result3); } else { WebTestMQTTVersionHostPlugin.Log.LogError((object)("Invalid value '" + settingData.Value + "' for Toggle '" + settingData.Message + "'")); } } else { WebTestMQTTVersionHostPlugin.Log.LogError((object)$"Unsupported type '{obj.GetType()}' for '{settingData.Message}'"); } } } } catch (Exception ex) { WebTestMQTTVersionHostPlugin.Log.LogError((object)("Error updating '" + settingData.Message + "': " + ex.Message)); } WebTestMQTTVersionHostPlugin.Log.LogInfo((object)("Successfully updated " + settingData.Message + " with value " + settingData.Value)); } catch (Exception ex2) { WebTestMQTTVersionHostPlugin.Log.LogError((object)("Failed to handle setting data: " + ex2.Message + "\n" + ex2.StackTrace)); } } public static object InvokeGraphicsOptionsMethod(string methodName, string value = null) { WebTestMQTTVersionHostPlugin.Log.LogInfo((object)("Attempting to find and modify variable: " + methodName)); GraphicsOptions val = MonoSingleton<GraphicsOptions>.Instance; if ((Object)(object)val == (Object)null) { WebTestMQTTVersionHostPlugin.Log.LogError((object)"GraphicsOptions Instance is NULL!"); WebTestMQTTVersionHostPlugin.Log.LogError((object)"GraphicsOptions Instance is NULL!"); GraphicsOptions[] array = Object.FindObjectsOfType<GraphicsOptions>(); WebTestMQTTVersionHostPlugin.Log.LogInfo((object)$"Total GraphicsOptions components found: {array.Length}"); if (array.Length == 0) { return null; } val = array[0]; } WebTestMQTTVersionHostPlugin.Log.LogInfo((object)("GraphicsOptions Type: " + ((object)val).GetType().FullName)); FieldInfo field = ((object)val).GetType().GetField(methodName, BindingFlags.Instance | BindingFlags.Static | BindingFlags.Public | BindingFlags.NonPublic); PropertyInfo property = ((object)val).GetType().GetProperty(methodName, BindingFlags.Instance | BindingFlags.Static | BindingFlags.Public | BindingFlags.NonPublic); if (field == null && property == null) { WebTestMQTTVersionHostPlugin.Log.LogError((object)("Field or Property '" + methodName + "' not found in GraphicsOptions!")); return null; } object obj = ((field != null) ? field.GetValue(val) : property.GetValue(val)); if (obj == null) { WebTestMQTTVersionHostPlugin.Log.LogError((object)("Field or Property '" + methodName + "' is null!")); return null; } try { Slider val2 = (Slider)((obj is Slider) ? obj : null); if (val2 != null) { if (float.TryParse(value, out var result)) { if (val2.maxValue < result) { val2.maxValue = result; } if (val2.minValue > result) { val2.minValue = result; } val2.value = result; ((UnityEvent<float>)(object)val2.onValueChanged).Invoke(result); } else { WebTestMQTTVersionHostPlugin.Log.LogError((object)("Invalid value '" + value + "' for Slider '" + methodName + "'")); } } else { TMP_Dropdown val3 = (TMP_Dropdown)((obj is TMP_Dropdown) ? obj : null); if (val3 != null) { if (int.TryParse(value, out var result2)) { val3.value = result2; ((UnityEvent<int>)(object)val3.onValueChanged).Invoke(result2); } else { WebTestMQTTVersionHostPlugin.Log.LogError((object)("Invalid value '" + value + "' for TMP_Dropdown '" + methodName + "'")); } } else { Toggle val4 = (Toggle)((obj is Toggle) ? obj : null); if (val4 != null) { if (bool.TryParse(value, out var result3)) { val4.isOn = result3; ((UnityEvent<bool>)(object)val4.onValueChanged).Invoke(result3); } else { WebTestMQTTVersionHostPlugin.Log.LogError((object)("Invalid value '" + value + "' for Toggle '" + methodName + "'")); } } else { WebTestMQTTVersionHostPlugin.Log.LogError((object)$"Unsupported type '{obj.GetType()}' for '{methodName}'"); } } } } catch (Exception ex) { WebTestMQTTVersionHostPlugin.Log.LogError((object)("Error updating '" + methodName + "': " + ex.Message)); return null; } return obj; } public void CheckWhichItIs(string method, object param, GraphicsOptions options) { if (method == "Dithering") { float value = (float)param; options.dithering.value = value; } } public static void DebugGraphicsOptionsStatus() { GraphicsOptions instance = MonoSingleton<GraphicsOptions>.Instance; WebTestMQTTVersionHostPlugin.Log.LogInfo((object)"--- GraphicsOptions Debug ---"); WebTestMQTTVersionHostPlugin.Log.LogInfo((object)$"MonoSingleton Instance: {(Object)(object)instance != (Object)null}"); GraphicsOptions[] array = Object.FindObjectsOfType<GraphicsOptions>(); WebTestMQTTVersionHostPlugin.Log.LogInfo((object)$"Total GraphicsOptions in scene: {array.Length}"); if (array.Length != 0) { GraphicsOptions[] array2 = array; foreach (GraphicsOptions val in array2) { WebTestMQTTVersionHostPlugin.Log.LogInfo((object)("Found GraphicsOptions Component: " + ((Object)val).name)); } } } public static void PrintAvailableMethods() { GraphicsOptions val = Object.FindObjectOfType<GraphicsOptions>(); if ((Object)(object)val == (Object)null) { WebTestMQTTVersionHostPlugin.Log.LogError((object)"No GraphicsOptions component found!"); return; } WebTestMQTTVersionHostPlugin.Log.LogInfo((object)"Available Methods in GraphicsOptions:"); MethodInfo[] methods = ((object)val).GetType().GetMethods(BindingFlags.Instance | BindingFlags.Static | BindingFlags.Public | BindingFlags.NonPublic); foreach (MethodInfo methodInfo in methods) { WebTestMQTTVersionHostPlugin.Log.LogInfo((object)("- " + methodInfo.Name)); } } public void HandleFileData(MessageDataJson messageData) { WebTestMQTTVersionHostPlugin.Log.LogInfo((object)"Turning Base64 into bytes"); byte[] fileData = FileHandler.Base64ToFile(messageData.Value); WebTestMQTTVersionHostPlugin.Log.LogInfo((object)"Converted Base64 into bytes!"); FileHandler.ProcessFile(fileData, messageData.FileName); } private IEnumerator waitalil(MessageDataJson messageData) { yield return (object)new WaitForSeconds(0.1f); HandleTextData(messageData); } public void HandleTextData(MessageDataJson messageData) { //IL_03b7: Unknown result type (might be due to invalid IL or missing references) //IL_03bc: Unknown result type (might be due to invalid IL or missing references) //IL_049e: Unknown result type (might be due to invalid IL or missing references) //IL_04a3: Unknown result type (might be due to invalid IL or missing references) //IL_05f1: Unknown result type (might be due to invalid IL or missing references) //IL_0a45: Unknown result type (might be due to invalid IL or missing references) //IL_0a6b: Unknown result type (might be due to invalid IL or missing references) //IL_0a70: Unknown result type (might be due to invalid IL or missing references) //IL_096b: Unknown result type (might be due to invalid IL or missing references) //IL_0991: Unknown result type (might be due to invalid IL or missing references) //IL_0996: Unknown result type (might be due to invalid IL or missing references) switch (messageData.Message) { case "DupeEnemies": { MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has duped enemies", "", "", 0, false); EnemyIdentifier[] array2 = Object.FindObjectsOfType<EnemyIdentifier>(); foreach (EnemyIdentifier val2 in array2) { Object.Instantiate<EnemyIdentifier>(val2, ((Component)val2).transform.position, Quaternion.identity); } break; } case "BuffEnemies": { MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has buffed enemies", "", "", 0, false); EnemyIdentifier[] array3 = Object.FindObjectsOfType<EnemyIdentifier>(); foreach (EnemyIdentifier val4 in array3) { val4.BuffAll(); } break; } case "DupeEnemy": { MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has duped a random enemy", "", "", 0, false); EnemyIdentifier val5 = (from x in Object.FindObjectsOfType<EnemyIdentifier>() where !x.dead select x).FirstOrDefault(); if ((Object)(object)val5 != (Object)null) { Object.Instantiate<EnemyIdentifier>(val5, ((Component)val5).transform.position, Quaternion.identity); } break; } case "BuffEnemy": { MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has buffed random enemy", "", "", 0, false); EnemyIdentifier val3 = (from x in Object.FindObjectsOfType<EnemyIdentifier>() where !x.dead select x).FirstOrDefault(); if ((Object)(object)val3 != (Object)null) { val3.BuffAll(); } break; } case "SendToLevel": MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has send you to " + messageData.Value, "", "", 0, false); WebTestMQTTVersionHostPlugin.Log.LogInfo((object)("Sending player to level " + messageData.Value)); HandleLeveling(messageData.Value); break; case "SpawnPKEY": WebTestMQTTVersionHostPlugin.Log.LogInfo((object)"Spawning PKEY"); HandleSpawnPKEY(messageData.Value); break; case "Yeet": MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has yeeted you with force " + messageData.Value, "", "", 0, false); MonoSingleton<NewMovement>.Instance.Launch(GenerateRandomVector3(-180f, -180f, -180f, 180f, 180f, 180f), float.Parse(messageData.Value), false); break; case "SetSpeed": MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has put your speed to " + messageData.Value, "", "", 0, false); MonoSingleton<NewMovement>.Instance.walkSpeed = float.Parse(messageData.Value); break; case "SetJump": MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has put your jump power to " + messageData.Value, "", "", 0, false); MonoSingleton<NewMovement>.Instance.jumpPower = float.Parse(messageData.Value); break; case "SetDamage": MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has set your damage multiplier to " + messageData.Value, "", "", 0, false); WebTestMQTTVersionHostPlugin.Instance.DMGMultiPLR = float.Parse(messageData.Value); break; case "SetDamageEnemy": MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has set enemy damage multiplier to " + messageData.Value, "", "", 0, false); WebTestMQTTVersionHostPlugin.Instance.DMGMultiENEMY = float.Parse(messageData.Value); break; case "Dmg": MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has damaged you with " + messageData.Value, "", "", 0, false); MonoSingleton<NewMovement>.Instance.GetHurt((int)float.Parse(messageData.Value), false, 1f, false, false, 0.35f, false); break; case "Heal": MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has healed you with " + messageData.Value, "", "", 0, false); MonoSingleton<NewMovement>.Instance.GetHealth((int)float.Parse(messageData.Value), false, false); break; case "NoFist": MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has removed your fists", "", "", 0, false); MonoSingleton<FistControl>.Instance.NoFist(); break; case "NoWeapons": MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has removed your weapons", "", "", 0, false); MonoSingleton<GunControl>.Instance.NoWeapon(); break; case "YesFist": MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has readded your fists", "", "", 0, false); MonoSingleton<FistControl>.Instance.YesFist(); break; case "YesWeapons": MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has readded your weapons", "", "", 0, false); MonoSingleton<GunControl>.Instance.YesWeapon(); break; case "Time": MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has set time to " + messageData.Value, "", "", 0, false); Time.timeScale = float.Parse(messageData.Value); break; case "Scale": MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has scaled everything by " + messageData.Value, "", "", 0, false); { foreach (GameObject item in FindAllObjectsInScene()) { if (!IsDescendantOf(item.transform, ((Component)MonoSingleton<NewMovement>.instance).transform) && !((Object)(object)item.GetComponent<RectTransform>() != (Object)null)) { Transform transform2 = item.transform; transform2.localScale += new Vector3(float.Parse(messageData.Value), float.Parse(messageData.Value), float.Parse(messageData.Value)); } } break; } case "Move": MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has moved everything by " + messageData.Value, "", "", 0, false); { foreach (GameObject item2 in FindAllObjectsInScene()) { if (!IsDescendantOf(item2.transform, ((Component)MonoSingleton<NewMovement>.instance).transform) && !((Object)(object)item2.GetComponent<RectTransform>() != (Object)null)) { Transform transform = item2.transform; transform.position += new Vector3(float.Parse(messageData.Value), float.Parse(messageData.Value), float.Parse(messageData.Value)); } } break; } case "KillAll": { MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.User + " has killed all enemies", "", "", 0, false); EnemyIdentifier[] array = Object.FindObjectsOfType<EnemyIdentifier>(); foreach (EnemyIdentifier val in array) { val.InstaKill(); } break; } case "SendHudMessage": MonoSingleton<HudMessageReceiver>.instance.SendHudMessage(messageData.Value, "", "", 0, false); break; } } public static GameObject FindObjectEvenIfDisabled(string rootName, string objPath = null, int childNum = 0, bool useChildNum = false) { //IL_000e: Unknown result type (might be due to invalid IL or missing references) //IL_0013: Unknown result type (might be due to invalid IL or missing references) Scene activeScene = SceneManager.GetActiveScene(); GameObject[] rootGameObjects = ((Scene)(ref activeScene)).GetRootGameObjects(); GameObject val = Array.Find(rootGameObjects, (GameObject obj) => ((Object)obj).name == rootName); if ((Object)(object)val == (Object)null) { return null; } GameObject val2 = val; if (!string.IsNullOrEmpty(objPath)) { Transform obj2 = val.transform.Find(objPath); val2 = ((obj2 != null) ? ((Component)obj2).gameObject : null); } if ((Object)(object)val2 != (Object)null && useChildNum && val2.transform.childCount > childNum) { val2 = ((Component)val2.transform.GetChild(childNum)).gameObject; } return val2; } public static List<GameObject> FindAllObjectsInScene() { //IL_0007: Unknown result type (might be due to invalid IL or missing references) //IL_000c: Unknown result type (might be due to invalid IL or missing references) List<GameObject> list = new List<GameObject>(); Scene activeScene = SceneManager.GetActiveScene(); GameObject[] rootGameObjects = ((Scene)(ref activeScene)).GetRootGameObjects(); GameObject[] array = rootGameObjects; foreach (GameObject obj in array) { CollectAllChildren(obj, list); } return list; } private static void CollectAllChildren(GameObject obj, List<GameObject> collectedObjects) { //IL_001e: Unknown result type (might be due to invalid IL or missing references) //IL_0024: Expected O, but got Unknown collectedObjects.Add(obj); foreach (Transform item in obj.transform) { Transform val = item; CollectAllChildren(((Component)val).gameObject, collectedObjects); } } public static bool IsDescendantOf(Transform potentialChild, Transform potentialParent) { Transform val = potentialChild; while ((Object)(object)val != (Object)null) { if ((Object)(object)val == (Object)(object)potentialParent) { return true; } val = val.parent; } return false; } public static Vector3 GenerateRandomVector3(float minX, float maxX, float minY, float maxY, float minZ, float maxZ) { //IL_001e: Unknown result type (might be due to invalid IL or missing references) //IL_0023: Unknown result type (might be due to invalid IL or missing references) //IL_0026: Unknown result type (might be due to invalid IL or missing references) float num = Random.Range(minX, maxX); float num2 = Random.Range(minY, maxY); float num3 = Random.Range(minZ, maxZ); return new Vector3(num, num2, num3); } public void HandleSpawnPKEY(string jsonData) { PKEYValueData pKEYValueData = JsonConvert.DeserializeObject<PKEYValueData>(jsonData); if (pKEYValueData != null) { WebTestMQTTVersionHostPlugin.Log.LogInfo((object)$"Spawning PKEY {pKEYValueData.PKEY} at {pKEYValueData.Vector3Pos}, relative: {pKEYValueData.Relative}, amount: {pKEYValueData.amount}, delay: {pKEYValueData.delay}"); ((MonoBehaviour)this).StartCoroutine(SpawnPKEYAddressableCoroutine(pKEYValueData)); } else { WebTestMQTTVersionHostPlugin.Log.LogWarning((object)"Failed to deserialize PKEY data."); } } private IEnumerator SpawnPKEYAddressableCoroutine(PKEYValueData pkeyData) { Vector3 basePosition = new Vector3(pkeyData.Vector3Pos.x, pkeyData.Vector3Pos.y, pkeyData.Vector3Pos.z); for (int i = 0; (float)i < pkeyData.amount; i++) { SpawnAddressable(position: pkeyData.Relative ? GetRelativePosition(basePosition) : basePosition, pkey: pkeyData.PKEY); WebTestMQTTVersionHostPlugin.Log.LogInfo((object)$"Spawned instance {i + 1}/{pkeyData.amount} of {pkeyData.PKEY}"); yield return (object)new WaitForSeconds(pkeyData.delay); } } public Vector3 GetRelativePosition(Vector3 position) { //IL_000b: Unknown result type (might be due to invalid IL or missing references) //IL_0010: Unknown result type (might be due to invalid IL or missing references) //IL_0011: Unknown result type (might be due to invalid IL or missing references) //IL_0016: Unknown result type (might be due to invalid IL or missing references) //IL_0018: Unknown result type (might be due to invalid IL or missing references) //IL_0019: Unknown result type (might be due to invalid IL or missing references) //IL_001c: Unknown result type (might be due to invalid IL or missing references) position = ((Component)MonoSingleton<NewMovement>.instance).transform.position + position; return position; } public void SpawnAddressable(string pkey, Vector3 position) { //IL_0007: Unknown result type (might be due to invalid IL or missing references) //IL_0008: Unknown result type (might be due to invalid IL or missing references) //IL_0026: Unknown result type (might be due to invalid IL or missing references) //IL_0041: Unknown result type (might be due to invalid IL or missing references) //IL_0046: Unknown result type (might be due to invalid IL or missing references) WebTestMQTTVersionHostPlugin.Log.LogInfo((object)$"Spawning addressable {pkey} at {position}"); AsyncOperationHandle<GameObject> val = Addressables.LoadAssetAsync<GameObject>((object)pkey); val.Completed += delegate(AsyncOperationHandle<GameObject> op) { //IL_0003: Unknown result type (might be due to invalid IL or missing references) //IL_0009: Invalid comparison between Unknown and I4 //IL_0018: Unknown result type (might be due to invalid IL or missing references) //IL_001d: Unknown result type (might be due to invalid IL or missing references) //IL_0039: Unknown result type (might be due to invalid IL or missing references) if ((int)op.Status == 1) { GameObject val2 = Object.Instantiate<GameObject>(op.Result, position, Quaternion.identity); WebTestMQTTVersionHostPlugin.Log.LogInfo((object)$"Successfully spawned {pkey} at {position}"); } else { WebTestMQTTVersionHostPlugin.Log.LogError((object)("Failed to load addressable " + pkey + ". Error: " + op.OperationException?.Message)); } }; } public void HandleLeveling(string levelText) { SceneHelper.LoadScene(levelText, false); } } [BepInPlugin("com.michi.WebTestMQTTVersionHost", "WebTestMQTTVersionHost", "1.0.0")] public class WebTestMQTTVersionHostPlugin : BaseUnityPlugin { private const string MyGUID = "com.michi.WebTestMQTTVersionHost"; private const string PluginName = "WebTestMQTTVersionHost"; private const string VersionString = "1.0.0"; private static readonly Harmony Harmony = new Harmony("com.michi.WebTestMQTTVersionHost"); public static ManualLogSource Log = new ManualLogSource("WebTestMQTTVersionHost"); private ConfigEntry<string> BrokerAddress; private ConfigEntry<int> BrokerPort; private ConfigEntry<string> BrokerUsername; private ConfigEntry<string> BrokerPassword; private const string Topic = "messages/actors"; private const string EncryptionKey = "MySecureKey123!"; private MessageHandler handler; public static bool LaunchLoggerthing = false; public static string Name = ""; public static JsonBindingMQTT Path = null; private IMqttClient mqttClient; public static JsonBindingMap map; private JustATestLogger thingthatisntactualyatestanymore; public float DMGMultiPLR = 1f; public float DMGMultiENEMY = 1f; [Configgable("Setup", null, 4, null)] public static ConfigButton Connect = new ConfigButton((Action)async delegate { await Instance.InitializeMqttClientAsync(); }, (string)null); public static WebTestMQTTVersionHostPlugin Instance { get; private set; } private async void Start() { Instance = this; handler = ((Component)this).gameObject.AddComponent<MessageHandler>(); thingthatisntactualyatestanymore = ((Component)this).gameObject.AddComponent<JustATestLogger>(); ((BaseUnityPlugin)this).Logger.LogInfo((object)"Starting Host Mod..."); BrokerAddress = ((BaseUnityPlugin)this).Config.Bind<string>("Setup", "Broker Address", "", (ConfigDescription)null); BrokerPort = ((BaseUnityPlugin)this).Config.Bind<int>("Setup", "Broker Port", 8883, (ConfigDescription)null); BrokerUsername = ((BaseUnityPlugin)this).Config.Bind<string>("Setup", "Broker Username", "", "Optional"); BrokerPassword = ((BaseUnityPlugin)this).Config.Bind<string>("Setup", "Broker Password", "", "Optional"); ConfigBuilder builder = new ConfigBuilder((string)null, (string)null); builder.BuildAll(); } private async Task InitializeMqttClientAsync() { MqttFactory factory = new MqttFactory(); mqttClient = factory.CreateMqttClient(); MqttClientOptionsBuilder optionsBuilder = new MqttClientOptionsBuilder().WithTcpServer(BrokerAddress.Value, (int?)BrokerPort.Value, AddressFamily.Unspecified).WithTlsOptions(new MqttClientTlsOptions { UseTls = true, AllowUntrustedCertificates = false, CertificateValidationHandler = (MqttClientCertificateValidationEventArgs context) => true }); if (!string.IsNullOrEmpty(BrokerUsername.Value) && !string.IsNullOrEmpty(BrokerPassword.Value)) { optionsBuilder.WithCredentials(BrokerUsername.Value, BrokerPassword.Value); } MqttClientOptions options = optionsBuilder.Build(); try { mqttClient.ConnectedAsync += async delegate { ((BaseUnityPlugin)this).Logger.LogInfo((object)$"Connected to MQTT broker at {BrokerAddress}:{BrokerPort}."); MonoSingleton<HudMessageReceiver>.instance.SendHudMessage("Succesfully connected to the broker!", "", "", 0, false); await MqttClientExtensions.SubscribeAsync(mqttClient, "messages/actors", (MqttQualityOfServiceLevel)0, default(CancellationToken)); }; mqttClient.DisconnectedAsync += delegate { ((BaseUnityPlugin)this).Logger.LogInfo((object)"Disconnected from MQTT broker."); return Task.CompletedTask; }; mqttClient.ApplicationMessageReceivedAsync += delegate(MqttApplicationMessageReceivedEventArgs e) { string @string = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); string text = DecryptMessage(@string, "MySecureKey123!"); MessageDataJson messageDataJson = JsonConvert.DeserializeObject<MessageDataJson>(text); ((BaseUnityPlugin)this).Logger.LogInfo((object)("Received message: message: " + messageDataJson.Message + ", Type: " + messageDataJson.Type + ", Value: " + messageDataJson.Value)); handler.HandleMessage(messageDataJson); return Task.CompletedTask; }; await mqttClient.ConnectAsync(options, default(CancellationToken)); } catch (Exception ex) { MonoSingleton<HudMessageReceiver>.instance.SendHudMessage("Failed to connect to broker. Check console for more info.", "", "", 0, false); ((BaseUnityPlugin)this).Logger.LogError((object)("Failed to connect to MQTT Broker: " + ex.Message)); } } private static string DecryptMessage(string encryptedText, string key) { using Aes aes = Aes.Create(); aes.Key = Encoding.UTF8.GetBytes(key.PadRight(32).Substring(0, 32)); aes.IV = new byte[16]; ICryptoTransform cryptoTransform = aes.CreateDecryptor(aes.Key, aes.IV); byte[] array = Convert.FromBase64String(encryptedText); byte[] bytes = cryptoTransform.TransformFinalBlock(array, 0, array.Length); return Encoding.UTF8.GetString(bytes); } private void OnDestroy() { IMqttClient obj = mqttClient; if (obj != null) { MqttClientExtensions.DisconnectAsync(obj, (MqttClientDisconnectOptionsReason)0, (string)null, 0u, (List<MqttUserProperty>)null, default(CancellationToken)); } } private void Awake() { ((BaseUnityPlugin)this).Logger.LogInfo((object)"PluginName: WebTestMQTTVersionHost, VersionString: 1.0.0 is loading..."); Harmony.PatchAll(); ((BaseUnityPlugin)this).Logger.LogInfo((object)"PluginName: WebTestMQTTVersionHost, VersionString: 1.0.0 is loaded."); Log = ((BaseUnityPlugin)this).Logger; } private void Update() { //IL_0074: Unknown result type (might be due to invalid IL or missing references) if (LaunchLoggerthing) { LaunchLoggerthing = false; InputAction val = MonoSingleton<InputManager>.instance.InputSource.Actions.FindAction(Name, false); if (val != null && Path != null && Path.path != null) { thingthatisntactualyatestanymore.ChangeKey(val, Path, MonoSingleton<InputManager>.instance.InputSource.Actions.KeyboardMouseScheme); } } } } [Serializable] public class SerializableVector3 { public float x; public float y; public float z; public SerializableVector3(Vector3 vector) { //IL_0009: Unknown result type (might be due to invalid IL or missing references) //IL_0015: Unknown result type (might be due to invalid IL or missing references) //IL_0021: Unknown result type (might be due to invalid IL or missing references) x = vector.x; y = vector.y; z = vector.z; } public Vector3 ToVector3() { //IL_0013: Unknown result type (might be due to invalid IL or missing references) //IL_0018: Unknown result type (might be due to invalid IL or missing references) //IL_001b: Unknown result type (might be due to invalid IL or missing references) return new Vector3(x, y, z); } } public class PKEYValueData { public string PKEY; public SerializableVector3 Vector3Pos; public bool Relative; public float delay; public float amount; } public class JsonBindingMQTT { public string path; public bool isComposite; public string[] compositePath; } public class MessageDataJson { public string User { get; set; } public string Message { get; set; } public string Type { get; set; } public string Value { get; set; } public string FileName { get; set; } } [HarmonyPatch(typeof(Slider), "UpdateDrag")] public class UnrestrictedSliderDragPatch { private static bool Prefix(Slider __instance, PointerEventData eventData, Camera cam) { //IL_001c: Unknown result type (might be due to invalid IL or missing references) //IL_0021: Unknown result type (might be due to invalid IL or missing references) //IL_0024: Unknown result type (might be due to invalid IL or missing references) //IL_0029: Unknown result type (might be due to invalid IL or missing references) //IL_002d: Unknown result type (might be due to invalid IL or missing references) //IL_0037: Expected I4, but got Unknown //IL_0049: Unknown result type (might be due to invalid IL or missing references) //IL_004e: Unknown result type (might be due to invalid IL or missing references) //IL_005a: Unknown result type (might be due to invalid IL or missing references) //IL_0065: Unknown result type (might be due to invalid IL or missing references) //IL_0068: Unknown result type (might be due to invalid IL or missing references) //IL_006d: Unknown result type (might be due to invalid IL or missing references) //IL_0070: Unknown result type (might be due to invalid IL or missing references) //IL_0075: Unknown result type (might be due to invalid IL or missing references) //IL_007a: Unknown result type (might be due to invalid IL or missing references) //IL_007d: Unknown result type (might be due to invalid IL or missing references) //IL_0082: Unknown result type (might be due to invalid IL or missing references) //IL_0085: Unknown result type (might be due to invalid IL or missing references) //IL_008a: Unknown result type (might be due to invalid IL or missing references) //IL_008e: Unknown result type (might be due to invalid IL or missing references) //IL_0098: Expected I4, but got Unknown //IL_009a: Unknown result type (might be due to invalid IL or missing references) //IL_009d: Unknown result type (might be due to invalid IL or missing references) //IL_00a2: Unknown result type (might be due to invalid IL or missing references) //IL_00a7: Unknown result type (might be due to invalid IL or missing references) //IL_00ab: Unknown result type (might be due to invalid IL or missing references) //IL_00b5: Expected I4, but got Unknown RectTransform val = __instance.m_HandleContainerRect ?? __instance.m_FillContainerRect; if ((Object)(object)val != (Object)null) { Rect rect = val.rect; Vector2 val2 = ((Rect)(ref rect)).size; if (((Vector2)(ref val2))[(int)__instance.axis] > 0f) { Vector2 zero = Vector2.zero; MultipleDisplayUtilities.GetRelativeMousePositionForDrag(eventData, ref zero); Vector2 val3 = default(Vector2); RectTransformUtility.ScreenPointToLocalPointInRectangle(val, zero, cam, ref val3); Vector2 val4 = val3; rect = val.rect; val3 = val4 - ((Rect)(ref rect)).position; rect = val.rect; val2 = ((Rect)(ref rect)).size; float num = ((Vector2)(ref val2))[(int)__instance.axis]; val2 = val3 - __instance.m_Offset; float num2 = ((Vector2)(ref val2))[(int)__instance.axis] / num; __instance.m_Value = Mathf.Lerp(__instance.minValue, __instance.maxValue, num2); __instance.UpdateVisuals(); UISystemProfilerApi.AddMarker("Slider.value", (Object)(object)__instance); ((UnityEvent<float>)(object)__instance.m_OnValueChanged).Invoke(__instance.m_Value); } } return false; } } [HarmonyPatch(typeof(NewMovement), "GetHurt")] public class PLRDAMAGE { public static bool Prefix(ref int damage) { damage *= (int)WebTestMQTTVersionHostPlugin.Instance.DMGMultiPLR; return true; } } [HarmonyPatch(typeof(EnemyIdentifier), "DeliverDamage")] public class EnemyDAMAGE { public static bool Prefix(ref float multiplier) { multiplier *= (int)WebTestMQTTVersionHostPlugin.Instance.DMGMultiENEMY; return true; } } [Serializable] public class MeshData { public Vector3[] vertices; public Vector3[] normals; public Vector2[] uvs; public int[] triangles; } [Serializable] public class MaterialData { public string name; public string shaderName; public string textureName; } [Serializable] public class ModelData { public MeshData[] meshes; public MaterialData[] materials; }
MQTTnet.dll
Decompiled 3 days ago
The result has been truncated due to the large size, download it to view full contents!
using System; using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Diagnostics; using System.IO; using System.Linq; using System.Net; using System.Net.Security; using System.Net.Sockets; using System.Net.WebSockets; using System.Reflection; using System.Resources; using System.Runtime.CompilerServices; using System.Runtime.ExceptionServices; using System.Runtime.InteropServices; using System.Runtime.Versioning; using System.Security; using System.Security.Authentication; using System.Security.Cryptography.X509Certificates; using System.Security.Permissions; using System.Text; using System.Threading; using System.Threading.Tasks; using MQTTnet.Adapter; using MQTTnet.Certificates; using MQTTnet.Channel; using MQTTnet.Client; using MQTTnet.Client.Internal; using MQTTnet.Diagnostics; using MQTTnet.Exceptions; using MQTTnet.Formatter; using MQTTnet.Formatter.V3; using MQTTnet.Formatter.V5; using MQTTnet.Implementations; using MQTTnet.Internal; using MQTTnet.LowLevelClient; using MQTTnet.PacketDispatcher; using MQTTnet.Packets; using MQTTnet.Protocol; using MQTTnet.Server; using MQTTnet.Server.Disconnecting; [assembly: CompilationRelaxations(8)] [assembly: RuntimeCompatibility(WrapNonExceptionThrows = true)] [assembly: Debuggable(DebuggableAttribute.DebuggingModes.IgnoreSymbolStoreSequencePoints)] [assembly: TargetFramework(".NETFramework,Version=v4.8", FrameworkDisplayName = ".NET Framework 4.8")] [assembly: AssemblyCompany("The contributors of MQTTnet")] [assembly: AssemblyConfiguration("Release")] [assembly: AssemblyCopyright("Copyright (c) .NET Foundation and Contributors")] [assembly: AssemblyDescription("MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker) and supports v3.1.0, v3.1.1 and v5.0.0 of the MQTT protocol.")] [assembly: AssemblyFileVersion("4.3.7.1207")] [assembly: AssemblyInformationalVersion("4.3.7.1207+740d605fd2c25922fad6ea60e78e8370356e95dd")] [assembly: AssemblyProduct("MQTTnet")] [assembly: AssemblyTitle("MQTTnet")] [assembly: AssemblyMetadata("RepositoryUrl", "https://github.com/dotnet/MQTTnet.git")] [assembly: NeutralResourcesLanguage("en-US")] [assembly: SecurityPermission(SecurityAction.RequestMinimum, SkipVerification = true)] [assembly: AssemblyVersion("4.3.7.1207")] [module: UnverifiableCode] namespace MQTTnet { public sealed class MqttApplicationMessage { private byte[] _payloadCache; private ArraySegment<byte> _payloadSegment = EmptyBuffer.ArraySegment; public string ContentType { get; set; } public byte[] CorrelationData { get; set; } public bool Dup { get; set; } public uint MessageExpiryInterval { get; set; } [Obsolete("Use PayloadSegment instead. This property will be removed in a future release.")] public byte[] Payload { get { if (_payloadSegment.Array == null) { return null; } if (_payloadSegment.Count == _payloadSegment.Array.Length) { return _payloadSegment.Array; } if (_payloadCache == null) { _payloadCache = new byte[_payloadSegment.Count]; MqttMemoryHelper.Copy(_payloadSegment.Array, _payloadSegment.Offset, _payloadCache, 0, _payloadCache.Length); } return _payloadCache; } set { _payloadCache = null; _payloadSegment = ((value == null || value.Length == 0) ? EmptyBuffer.ArraySegment : new ArraySegment<byte>(value)); } } public ArraySegment<byte> PayloadSegment { get { return _payloadSegment; } set { _payloadCache = null; _payloadSegment = value; } } public MqttPayloadFormatIndicator PayloadFormatIndicator { get; set; } public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; } public string ResponseTopic { get; set; } public bool Retain { get; set; } public List<uint> SubscriptionIdentifiers { get; set; } public string Topic { get; set; } public ushort TopicAlias { get; set; } public List<MqttUserProperty> UserProperties { get; set; } } public sealed class MqttApplicationMessageBuilder { private string _contentType; private byte[] _correlationData; private uint _messageExpiryInterval; private MqttPayloadFormatIndicator _payloadFormatIndicator; private ArraySegment<byte> _payloadSegment; private MqttQualityOfServiceLevel _qualityOfServiceLevel; private string _responseTopic; private bool _retain; private List<uint> _subscriptionIdentifiers; private string _topic; private ushort _topicAlias; private List<MqttUserProperty> _userProperties; public MqttApplicationMessage Build() { if (_topicAlias == 0 && string.IsNullOrEmpty(_topic)) { throw new MqttProtocolViolationException("Topic or TopicAlias is not set."); } return new MqttApplicationMessage { Topic = _topic, PayloadSegment = _payloadSegment, QualityOfServiceLevel = _qualityOfServiceLevel, Retain = _retain, ContentType = _contentType, ResponseTopic = _responseTopic, CorrelationData = _correlationData, TopicAlias = _topicAlias, SubscriptionIdentifiers = _subscriptionIdentifiers, MessageExpiryInterval = _messageExpiryInterval, PayloadFormatIndicator = _payloadFormatIndicator, UserProperties = _userProperties }; } public MqttApplicationMessageBuilder WithContentType(string contentType) { _contentType = contentType; return this; } public MqttApplicationMessageBuilder WithCorrelationData(byte[] correlationData) { _correlationData = correlationData; return this; } public MqttApplicationMessageBuilder WithMessageExpiryInterval(uint messageExpiryInterval) { _messageExpiryInterval = messageExpiryInterval; return this; } public MqttApplicationMessageBuilder WithPayload(byte[] payload) { _payloadSegment = ((payload == null || payload.Length == 0) ? EmptyBuffer.ArraySegment : new ArraySegment<byte>(payload)); return this; } public MqttApplicationMessageBuilder WithPayload(ArraySegment<byte> payloadSegment) { _payloadSegment = payloadSegment; return this; } public MqttApplicationMessageBuilder WithPayload(IEnumerable<byte> payload) { if (payload == null) { return WithPayload((byte[])null); } if (payload is byte[] payload2) { return WithPayload(payload2); } if (payload is ArraySegment<byte> payloadSegment) { return WithPayloadSegment(payloadSegment); } return WithPayload(payload.ToArray()); } public MqttApplicationMessageBuilder WithPayload(Stream payload) { if (payload != null) { return WithPayload(payload, payload.Length - payload.Position); } return WithPayload((byte[])null); } public MqttApplicationMessageBuilder WithPayload(Stream payload, long length) { if (payload == null || length == 0L) { return WithPayload((byte[])null); } byte[] array = new byte[length]; int num = 0; do { int num2 = payload.Read(array, num, array.Length - num); if (num2 == 0) { break; } num += num2; } while (num < length); return WithPayload(array); } public MqttApplicationMessageBuilder WithPayload(string payload) { if (string.IsNullOrEmpty(payload)) { return WithPayload((byte[])null); } byte[] bytes = Encoding.UTF8.GetBytes(payload); return WithPayload(bytes); } public MqttApplicationMessageBuilder WithPayloadFormatIndicator(MqttPayloadFormatIndicator payloadFormatIndicator) { _payloadFormatIndicator = payloadFormatIndicator; return this; } public MqttApplicationMessageBuilder WithPayloadSegment(ArraySegment<byte> payloadSegment) { _payloadSegment = payloadSegment; return this; } public MqttApplicationMessageBuilder WithQualityOfServiceLevel(MqttQualityOfServiceLevel qualityOfServiceLevel) { _qualityOfServiceLevel = qualityOfServiceLevel; return this; } public MqttApplicationMessageBuilder WithResponseTopic(string responseTopic) { _responseTopic = responseTopic; return this; } public MqttApplicationMessageBuilder WithRetainFlag(bool value = true) { _retain = value; return this; } public MqttApplicationMessageBuilder WithSubscriptionIdentifier(uint subscriptionIdentifier) { if (_subscriptionIdentifiers == null) { _subscriptionIdentifiers = new List<uint>(); } _subscriptionIdentifiers.Add(subscriptionIdentifier); return this; } public MqttApplicationMessageBuilder WithTopic(string topic) { _topic = topic; return this; } public MqttApplicationMessageBuilder WithTopicAlias(ushort topicAlias) { _topicAlias = topicAlias; return this; } public MqttApplicationMessageBuilder WithUserProperty(string name, string value) { if (_userProperties == null) { _userProperties = new List<MqttUserProperty>(); } _userProperties.Add(new MqttUserProperty(name, value)); return this; } } public static class MqttApplicationMessageExtensions { public static string ConvertPayloadToString(this MqttApplicationMessage applicationMessage) { if (applicationMessage == null) { throw new ArgumentNullException("applicationMessage"); } if (applicationMessage.PayloadSegment == EmptyBuffer.ArraySegment) { return null; } if (applicationMessage.PayloadSegment.Array == null) { return null; } ArraySegment<byte> payloadSegment = applicationMessage.PayloadSegment; return Encoding.UTF8.GetString(payloadSegment.Array, payloadSegment.Offset, payloadSegment.Count); } } public static class MqttApplicationMessageValidator { public static void ThrowIfNotSupported(MqttApplicationMessage applicationMessage, MqttProtocolVersion protocolVersion) { if (applicationMessage == null) { throw new ArgumentNullException("applicationMessage"); } if (protocolVersion != MqttProtocolVersion.V500) { string contentType = applicationMessage.ContentType; if (contentType != null && contentType.Any()) { Throw("ContentType"); } List<MqttUserProperty> userProperties = applicationMessage.UserProperties; if (userProperties != null && userProperties.Any()) { Throw("UserProperties"); } byte[] correlationData = applicationMessage.CorrelationData; if (correlationData != null && correlationData.Any()) { Throw("CorrelationData"); } string responseTopic = applicationMessage.ResponseTopic; if (responseTopic != null && responseTopic.Any()) { Throw("ResponseTopic"); } List<uint> subscriptionIdentifiers = applicationMessage.SubscriptionIdentifiers; if (subscriptionIdentifiers != null && subscriptionIdentifiers.Any()) { Throw("SubscriptionIdentifiers"); } if (applicationMessage.TopicAlias > 0) { Throw("TopicAlias"); } if (applicationMessage.PayloadFormatIndicator != 0) { Throw("PayloadFormatIndicator"); } } } private static void Throw(string featureName) { throw new NotSupportedException("Feature " + featureName + " requires MQTT version 5.0.0."); } } public sealed class MqttFactory { private IMqttClientAdapterFactory _clientAdapterFactory; public IMqttNetLogger DefaultLogger { get; } public IList<Func<MqttFactory, IMqttServerAdapter>> DefaultServerAdapters { get; } = new List<Func<MqttFactory, IMqttServerAdapter>> { (MqttFactory factory) => new MqttTcpServerAdapter() }; public IDictionary<object, object> Properties { get; } = new Dictionary<object, object>(); public MqttFactory() : this(new MqttNetNullLogger()) { } public MqttFactory(IMqttNetLogger logger) { DefaultLogger = logger ?? throw new ArgumentNullException("logger"); _clientAdapterFactory = new MqttClientAdapterFactory(); } public MqttApplicationMessageBuilder CreateApplicationMessageBuilder() { return new MqttApplicationMessageBuilder(); } public MqttClientDisconnectOptionsBuilder CreateClientDisconnectOptionsBuilder() { return new MqttClientDisconnectOptionsBuilder(); } public MqttClientOptionsBuilder CreateClientOptionsBuilder() { return new MqttClientOptionsBuilder(); } public ILowLevelMqttClient CreateLowLevelMqttClient() { return CreateLowLevelMqttClient(DefaultLogger); } public ILowLevelMqttClient CreateLowLevelMqttClient(IMqttNetLogger logger) { if (logger == null) { throw new ArgumentNullException("logger"); } return new LowLevelMqttClient(_clientAdapterFactory, logger); } public ILowLevelMqttClient CreateLowLevelMqttClient(IMqttClientAdapterFactory clientAdapterFactory) { if (clientAdapterFactory == null) { throw new ArgumentNullException("clientAdapterFactory"); } return new LowLevelMqttClient(_clientAdapterFactory, DefaultLogger); } public ILowLevelMqttClient CreateLowLevelMqttClient(IMqttNetLogger logger, IMqttClientAdapterFactory clientAdapterFactory) { if (logger == null) { throw new ArgumentNullException("logger"); } if (clientAdapterFactory == null) { throw new ArgumentNullException("clientAdapterFactory"); } return new LowLevelMqttClient(_clientAdapterFactory, logger); } public IMqttClient CreateMqttClient() { return CreateMqttClient(DefaultLogger); } public IMqttClient CreateMqttClient(IMqttNetLogger logger) { if (logger == null) { throw new ArgumentNullException("logger"); } return new MQTTnet.Client.MqttClient(_clientAdapterFactory, logger); } public IMqttClient CreateMqttClient(IMqttClientAdapterFactory clientAdapterFactory) { if (clientAdapterFactory == null) { throw new ArgumentNullException("clientAdapterFactory"); } return new MQTTnet.Client.MqttClient(clientAdapterFactory, DefaultLogger); } public IMqttClient CreateMqttClient(IMqttNetLogger logger, IMqttClientAdapterFactory clientAdapterFactory) { if (logger == null) { throw new ArgumentNullException("logger"); } if (clientAdapterFactory == null) { throw new ArgumentNullException("clientAdapterFactory"); } return new MQTTnet.Client.MqttClient(clientAdapterFactory, logger); } public MqttServer CreateMqttServer(MqttServerOptions options) { return CreateMqttServer(options, DefaultLogger); } public MqttServer CreateMqttServer(MqttServerOptions options, IMqttNetLogger logger) { if (logger == null) { throw new ArgumentNullException("logger"); } IEnumerable<IMqttServerAdapter> serverAdapters = DefaultServerAdapters.Select((Func<MqttFactory, IMqttServerAdapter> a) => a(this)); return CreateMqttServer(options, serverAdapters, logger); } public MqttServer CreateMqttServer(MqttServerOptions options, IEnumerable<IMqttServerAdapter> serverAdapters, IMqttNetLogger logger) { if (serverAdapters == null) { throw new ArgumentNullException("serverAdapters"); } if (logger == null) { throw new ArgumentNullException("logger"); } return new MqttServer(options, serverAdapters, logger); } public MqttServer CreateMqttServer(MqttServerOptions options, IEnumerable<IMqttServerAdapter> serverAdapters) { if (serverAdapters == null) { throw new ArgumentNullException("serverAdapters"); } return new MqttServer(options, serverAdapters, DefaultLogger); } public MqttServerClientDisconnectOptionsBuilder CreateMqttServerClientDisconnectOptionsBuilder() { return new MqttServerClientDisconnectOptionsBuilder(); } public MqttServerStopOptionsBuilder CreateMqttServerStopOptionsBuilder() { return new MqttServerStopOptionsBuilder(); } public MqttServerOptionsBuilder CreateServerOptionsBuilder() { return new MqttServerOptionsBuilder(); } public MqttClientSubscribeOptionsBuilder CreateSubscribeOptionsBuilder() { return new MqttClientSubscribeOptionsBuilder(); } public MqttTopicFilterBuilder CreateTopicFilterBuilder() { return new MqttTopicFilterBuilder(); } public MqttClientUnsubscribeOptionsBuilder CreateUnsubscribeOptionsBuilder() { return new MqttClientUnsubscribeOptionsBuilder(); } public MqttFactory UseClientAdapterFactory(IMqttClientAdapterFactory clientAdapterFactory) { _clientAdapterFactory = clientAdapterFactory ?? throw new ArgumentNullException("clientAdapterFactory"); return this; } } public sealed class MqttTopicFilterBuilder { private MqttQualityOfServiceLevel _qualityOfServiceLevel; private string _topic; private bool _noLocal; private bool _retainAsPublished; private MqttRetainHandling _retainHandling; public MqttTopicFilterBuilder WithTopic(string topic) { _topic = topic; return this; } public MqttTopicFilterBuilder WithQualityOfServiceLevel(MqttQualityOfServiceLevel qualityOfServiceLevel) { _qualityOfServiceLevel = qualityOfServiceLevel; return this; } public MqttTopicFilterBuilder WithAtLeastOnceQoS() { _qualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce; return this; } public MqttTopicFilterBuilder WithAtMostOnceQoS() { _qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce; return this; } public MqttTopicFilterBuilder WithExactlyOnceQoS() { _qualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce; return this; } public MqttTopicFilterBuilder WithNoLocal(bool value = true) { _noLocal = value; return this; } public MqttTopicFilterBuilder WithRetainAsPublished(bool value = true) { _retainAsPublished = value; return this; } public MqttTopicFilterBuilder WithRetainHandling(MqttRetainHandling value) { _retainHandling = value; return this; } public MqttTopicFilter Build() { if (string.IsNullOrEmpty(_topic)) { throw new MqttProtocolViolationException("Topic is not set."); } return new MqttTopicFilter { Topic = _topic, QualityOfServiceLevel = _qualityOfServiceLevel, NoLocal = _noLocal, RetainAsPublished = _retainAsPublished, RetainHandling = _retainHandling }; } } public static class MqttTopicFilterComparer { public const char LevelSeparator = '/'; public const char MultiLevelWildcard = '#'; public const char SingleLevelWildcard = '+'; public const char ReservedTopicPrefix = '$'; public unsafe static MqttTopicFilterCompareResult Compare(string topic, string filter) { if (string.IsNullOrEmpty(topic)) { return MqttTopicFilterCompareResult.TopicInvalid; } if (string.IsNullOrEmpty(filter)) { return MqttTopicFilterCompareResult.FilterInvalid; } int num = 0; int length = filter.Length; int i = 0; int length2 = topic.Length; fixed (char* ptr2 = topic) { fixed (char* ptr = filter) { if (length > length2) { char c = ptr[length - 1]; if (c != '#' && c != '+') { return MqttTopicFilterCompareResult.NoMatch; } } bool flag = ptr[length - 1] == '#'; bool flag2 = *ptr2 == '$'; if (flag2 && length == 1 && flag) { return MqttTopicFilterCompareResult.NoMatch; } if (flag2 && *ptr == '+') { return MqttTopicFilterCompareResult.NoMatch; } if (length == 1 && flag) { return MqttTopicFilterCompareResult.IsMatch; } while (num < length && i < length2) { if (ptr[num] == '#' && num != length - 1) { return MqttTopicFilterCompareResult.FilterInvalid; } if (ptr[num] == ptr2[i]) { if (i == length2 - 1) { if (num == length - 3 && ptr[num + 1] == '/' && flag) { return MqttTopicFilterCompareResult.IsMatch; } if (num == length - 2 && ptr[num] == '/' && flag) { return MqttTopicFilterCompareResult.IsMatch; } } num++; i++; if (num == length && i == length2) { return MqttTopicFilterCompareResult.IsMatch; } if (i == length2 && num == length - 1 && ptr[num] == '+') { if (num > 0 && ptr[num - 1] != '/') { return MqttTopicFilterCompareResult.FilterInvalid; } return MqttTopicFilterCompareResult.IsMatch; } continue; } if (ptr[num] == '+') { if (num > 0 && ptr[num - 1] != '/') { return MqttTopicFilterCompareResult.FilterInvalid; } if (num < length - 1 && ptr[num + 1] != '/') { return MqttTopicFilterCompareResult.FilterInvalid; } num++; for (; i < length2 && ptr2[i] != '/'; i++) { } if (i == length2 && num == length) { return MqttTopicFilterCompareResult.IsMatch; } continue; } if (ptr[num] == '#') { if (num > 0 && ptr[num - 1] != '/') { return MqttTopicFilterCompareResult.FilterInvalid; } if (num + 1 != length) { return MqttTopicFilterCompareResult.FilterInvalid; } return MqttTopicFilterCompareResult.IsMatch; } if (num > 0 && num + 2 == length && i == length2 && ptr[num - 1] == '+' && ptr[num] == '/' && flag) { return MqttTopicFilterCompareResult.IsMatch; } return MqttTopicFilterCompareResult.NoMatch; } } } return MqttTopicFilterCompareResult.NoMatch; } } public enum MqttTopicFilterCompareResult { NoMatch, IsMatch, FilterInvalid, TopicInvalid } } namespace MQTTnet.Server { public sealed class ApplicationMessageEnqueuedEventArgs : EventArgs { public string SenderClientId { get; } public string ReceiverClientId { get; } public bool IsDropped { get; } public MqttApplicationMessage ApplicationMessage { get; } public ApplicationMessageEnqueuedEventArgs(string senderClientId, string receiverClientId, MqttApplicationMessage applicationMessage, bool isDropped) { SenderClientId = senderClientId ?? throw new ArgumentNullException("senderClientId"); ReceiverClientId = receiverClientId ?? throw new ArgumentNullException("receiverClientId"); ApplicationMessage = applicationMessage ?? throw new ArgumentNullException("applicationMessage"); IsDropped = isDropped; } } public sealed class ApplicationMessageNotConsumedEventArgs : EventArgs { public MqttApplicationMessage ApplicationMessage { get; } public string SenderId { get; } public ApplicationMessageNotConsumedEventArgs(MqttApplicationMessage applicationMessage, string senderId) { ApplicationMessage = applicationMessage ?? throw new ArgumentNullException("applicationMessage"); SenderId = senderId; } } public sealed class ClientAcknowledgedPublishPacketEventArgs : EventArgs { public MqttPacketWithIdentifier AcknowledgePacket { get; } public string ClientId { get; } public bool IsCompleted { get { if (!(AcknowledgePacket is MqttPubAckPacket)) { return AcknowledgePacket is MqttPubCompPacket; } return true; } } public MqttPublishPacket PublishPacket { get; } public IDictionary SessionItems { get; } public ClientAcknowledgedPublishPacketEventArgs(string clientId, IDictionary sessionItems, MqttPublishPacket publishPacket, MqttPacketWithIdentifier acknowledgePacket) { ClientId = clientId ?? throw new ArgumentNullException("clientId"); SessionItems = sessionItems ?? throw new ArgumentNullException("sessionItems"); PublishPacket = publishPacket ?? throw new ArgumentNullException("publishPacket"); AcknowledgePacket = acknowledgePacket ?? throw new ArgumentNullException("acknowledgePacket"); } } public sealed class ClientConnectedEventArgs : EventArgs { private readonly MqttConnectPacket _connectPacket; public byte[] AuthenticationData => _connectPacket.AuthenticationData; public string AuthenticationMethod => _connectPacket.AuthenticationMethod; public string ClientId => _connectPacket.ClientId; public string Endpoint { get; } public MqttProtocolVersion ProtocolVersion { get; } public IDictionary SessionItems { get; } public string UserName => _connectPacket.Username; public List<MqttUserProperty> UserProperties => _connectPacket?.UserProperties; public ClientConnectedEventArgs(MqttConnectPacket connectPacket, MqttProtocolVersion protocolVersion, string endpoint, IDictionary sessionItems) { _connectPacket = connectPacket ?? throw new ArgumentNullException("connectPacket"); ProtocolVersion = protocolVersion; Endpoint = endpoint; SessionItems = sessionItems ?? throw new ArgumentNullException("sessionItems"); } } public sealed class ClientDisconnectedEventArgs : EventArgs { private readonly MqttDisconnectPacket _disconnectPacket; public string ClientId { get; } public MqttClientDisconnectType DisconnectType { get; } public string Endpoint { get; } public MqttDisconnectReasonCode? ReasonCode => _disconnectPacket?.ReasonCode; public string ReasonString => _disconnectPacket?.ReasonString; public uint SessionExpiryInterval => _disconnectPacket?.SessionExpiryInterval ?? 0; public IDictionary SessionItems { get; } public List<MqttUserProperty> UserProperties => _disconnectPacket?.UserProperties; public ClientDisconnectedEventArgs(string clientId, MqttDisconnectPacket disconnectPacket, MqttClientDisconnectType disconnectType, string endpoint, IDictionary sessionItems) { ClientId = clientId ?? throw new ArgumentNullException("clientId"); DisconnectType = disconnectType; Endpoint = endpoint; SessionItems = sessionItems ?? throw new ArgumentNullException("sessionItems"); _disconnectPacket = disconnectPacket; } } public sealed class ClientSubscribedTopicEventArgs : EventArgs { public string ClientId { get; } public IDictionary SessionItems { get; } public MqttTopicFilter TopicFilter { get; } public ClientSubscribedTopicEventArgs(string clientId, MqttTopicFilter topicFilter, IDictionary sessionItems) { ClientId = clientId ?? throw new ArgumentNullException("clientId"); TopicFilter = topicFilter ?? throw new ArgumentNullException("topicFilter"); SessionItems = sessionItems ?? throw new ArgumentNullException("sessionItems"); } } public sealed class ClientUnsubscribedTopicEventArgs : EventArgs { public string ClientId { get; } public IDictionary SessionItems { get; } public string TopicFilter { get; } public ClientUnsubscribedTopicEventArgs(string clientId, string topicFilter, IDictionary sessionItems) { ClientId = clientId ?? throw new ArgumentNullException("clientId"); TopicFilter = topicFilter ?? throw new ArgumentNullException("topicFilter"); SessionItems = sessionItems ?? throw new ArgumentNullException("sessionItems"); } } public sealed class InterceptingClientApplicationMessageEnqueueEventArgs : EventArgs { public bool AcceptEnqueue { get; set; } = true; public MqttApplicationMessage ApplicationMessage { get; } public bool CloseSenderConnection { get; set; } public string ReceiverClientId { get; } public string SenderClientId { get; } public InterceptingClientApplicationMessageEnqueueEventArgs(string senderClientId, string receiverClientId, MqttApplicationMessage applicationMessage) { SenderClientId = senderClientId ?? throw new ArgumentNullException("senderClientId"); ReceiverClientId = receiverClientId ?? throw new ArgumentNullException("receiverClientId"); ApplicationMessage = applicationMessage ?? throw new ArgumentNullException("applicationMessage"); } } public sealed class InterceptingPacketEventArgs : EventArgs { public CancellationToken CancellationToken { get; } public string ClientId { get; } public string Endpoint { get; } public MqttPacket Packet { get; set; } public bool ProcessPacket { get; set; } = true; public IDictionary SessionItems { get; } public InterceptingPacketEventArgs(CancellationToken cancellationToken, string clientId, string endpoint, MqttPacket packet, IDictionary sessionItems) { CancellationToken = cancellationToken; ClientId = clientId ?? throw new ArgumentNullException("clientId"); Endpoint = endpoint; Packet = packet ?? throw new ArgumentNullException("packet"); SessionItems = sessionItems; } } public sealed class InterceptingPublishEventArgs : EventArgs { public MqttApplicationMessage ApplicationMessage { get; set; } public CancellationToken CancellationToken { get; } public string ClientId { get; } public bool CloseConnection { get; set; } public bool ProcessPublish { get; set; } = true; public PublishResponse Response { get; } = new PublishResponse(); public IDictionary SessionItems { get; } public InterceptingPublishEventArgs(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken, string clientId, IDictionary sessionItems) { ApplicationMessage = applicationMessage ?? throw new ArgumentNullException("applicationMessage"); CancellationToken = cancellationToken; ClientId = clientId ?? throw new ArgumentNullException("clientId"); SessionItems = sessionItems ?? throw new ArgumentNullException("sessionItems"); } } public sealed class InterceptingSubscriptionEventArgs : EventArgs { public CancellationToken CancellationToken { get; } public string ClientId { get; } public bool CloseConnection { get; set; } public bool ProcessSubscription { get; set; } = true; public string ReasonString { get; set; } public SubscribeResponse Response { get; } = new SubscribeResponse(); public MqttSessionStatus Session { get; } public IDictionary SessionItems => Session.Items; public MqttTopicFilter TopicFilter { get; set; } public List<MqttUserProperty> UserProperties { get; set; } public InterceptingSubscriptionEventArgs(CancellationToken cancellationToken, string clientId, MqttSessionStatus session, MqttTopicFilter topicFilter, List<MqttUserProperty> userProperties) { CancellationToken = cancellationToken; ClientId = clientId; Session = session; TopicFilter = topicFilter; UserProperties = userProperties; } } public sealed class InterceptingUnsubscriptionEventArgs : EventArgs { public CancellationToken CancellationToken { get; } public string ClientId { get; } public bool CloseConnection { get; set; } public bool ProcessUnsubscription { get; set; } = true; public UnsubscribeResponse Response { get; } = new UnsubscribeResponse(); public IDictionary SessionItems { get; } public string Topic { get; } public List<MqttUserProperty> UserProperties { get; set; } public InterceptingUnsubscriptionEventArgs(CancellationToken cancellationToken, string clientId, IDictionary sessionItems, string topic, List<MqttUserProperty> userProperties) { CancellationToken = cancellationToken; ClientId = clientId; SessionItems = sessionItems; Topic = topic; UserProperties = userProperties; } } public sealed class LoadingRetainedMessagesEventArgs : EventArgs { public List<MqttApplicationMessage> LoadedRetainedMessages { get; set; } = new List<MqttApplicationMessage>(); } public sealed class PreparingSessionEventArgs : EventArgs { public string Id { get; set; } public bool IsExistingSession { get; set; } public IDictionary<object, object> Items { get; set; } public List<MqttPublishPacket> PublishPackets { get; } = new List<MqttPublishPacket>(); private DateTime? SessionExpiryTimestamp { get; set; } public List<MqttSubscription> Subscriptions { get; } = new List<MqttSubscription>(); public uint? WillDelayInterval { get; set; } } public sealed class QueueMessageOverwrittenEventArgs : EventArgs { public MqttPacket Packet { get; } public string ReceiverClientId { get; } public QueueMessageOverwrittenEventArgs(string receiverClientId, MqttPacket packet) { ReceiverClientId = receiverClientId ?? throw new ArgumentNullException("receiverClientId"); Packet = packet ?? throw new ArgumentNullException("packet"); } } public sealed class RetainedMessageChangedEventArgs : EventArgs { public MqttApplicationMessage ChangedRetainedMessage { get; } public string ClientId { get; } public List<MqttApplicationMessage> StoredRetainedMessages { get; } public RetainedMessageChangedEventArgs(string clientId, MqttApplicationMessage changedRetainedMessage, List<MqttApplicationMessage> storedRetainedMessages) { ClientId = clientId ?? throw new ArgumentNullException("clientId"); ChangedRetainedMessage = changedRetainedMessage ?? throw new ArgumentNullException("changedRetainedMessage"); StoredRetainedMessages = storedRetainedMessages ?? throw new ArgumentNullException("storedRetainedMessages"); } } public sealed class SessionDeletedEventArgs : EventArgs { public string Id { get; } public IDictionary SessionItems { get; } public SessionDeletedEventArgs(string id, IDictionary sessionItems) { Id = id ?? throw new ArgumentNullException("id"); SessionItems = sessionItems ?? throw new ArgumentNullException("sessionItems"); } } public sealed class ValidatingConnectionEventArgs : EventArgs { private readonly MqttConnectPacket _connectPacket; public string AssignedClientIdentifier { get; set; } public byte[] AuthenticationData => _connectPacket.AuthenticationData; public string AuthenticationMethod => _connectPacket.AuthenticationMethod; public IMqttChannelAdapter ChannelAdapter { get; } public bool? CleanSession => _connectPacket.CleanSession; public X509Certificate2 ClientCertificate => ChannelAdapter.ClientCertificate; public string ClientId => _connectPacket.ClientId; public string Endpoint => ChannelAdapter.Endpoint; public bool IsSecureConnection => ChannelAdapter.IsSecureConnection; public ushort? KeepAlivePeriod => _connectPacket.KeepAlivePeriod; public uint MaximumPacketSize => _connectPacket.MaximumPacketSize; public string Password => Encoding.UTF8.GetString(RawPassword ?? EmptyBuffer.Array); public MqttProtocolVersion ProtocolVersion => ChannelAdapter.PacketFormatterAdapter.ProtocolVersion; public byte[] RawPassword => _connectPacket.Password; public MqttConnectReasonCode ReasonCode { get; set; } public string ReasonString { get; set; } public ushort ReceiveMaximum => _connectPacket.ReceiveMaximum; public bool RequestProblemInformation => _connectPacket.RequestProblemInformation; public bool RequestResponseInformation => _connectPacket.RequestResponseInformation; public byte[] ResponseAuthenticationData { get; set; } public List<MqttUserProperty> ResponseUserProperties { get; set; } public string ServerReference { get; set; } public uint SessionExpiryInterval => _connectPacket.SessionExpiryInterval; public IDictionary SessionItems { get; } public ushort TopicAliasMaximum => _connectPacket.TopicAliasMaximum; [Obsolete("This property name has a typo. Use 'UserName' instead. This one will be removed soon.")] public string Username => _connectPacket.Username; public string UserName => _connectPacket.Username; public List<MqttUserProperty> UserProperties => _connectPacket.UserProperties; public uint WillDelayInterval => _connectPacket.WillDelayInterval; public ValidatingConnectionEventArgs(MqttConnectPacket connectPacket, IMqttChannelAdapter clientAdapter, IDictionary sessionItems) { _connectPacket = connectPacket ?? throw new ArgumentNullException("connectPacket"); ChannelAdapter = clientAdapter ?? throw new ArgumentNullException("clientAdapter"); SessionItems = sessionItems ?? throw new ArgumentNullException("sessionItems"); } } public sealed class InjectedMqttApplicationMessage { public MqttApplicationMessage ApplicationMessage { get; } public IDictionary CustomSessionItems { get; set; } public string SenderClientId { get; set; } = string.Empty; public InjectedMqttApplicationMessage(MqttApplicationMessage applicationMessage) { ApplicationMessage = applicationMessage ?? throw new ArgumentNullException("applicationMessage"); } } public sealed class CheckSubscriptionsResult { public static CheckSubscriptionsResult NotSubscribed { get; } = new CheckSubscriptionsResult(); public bool IsSubscribed { get; set; } public bool RetainAsPublished { get; set; } public List<uint> SubscriptionIdentifiers { get; set; } public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; } } public sealed class DispatchApplicationMessageResult { public bool CloseConnection { get; } public int ReasonCode { get; } public string ReasonString { get; } public List<MqttUserProperty> UserProperties { get; } public DispatchApplicationMessageResult(int reasonCode, bool closeConnection, string reasonString, List<MqttUserProperty> userProperties) { ReasonCode = reasonCode; CloseConnection = closeConnection; ReasonString = reasonString; UserProperties = userProperties; } } public enum EnqueueDataPacketResult { Enqueued, Dropped } public interface ISubscriptionChangedNotification { void OnSubscriptionsAdded(MqttSession clientSession, List<string> subscriptionsTopics); void OnSubscriptionsRemoved(MqttSession clientSession, List<string> subscriptionTopics); } public sealed class MqttClient : IDisposable { private readonly MqttConnectPacket _connectPacket; private readonly MqttServerEventContainer _eventContainer; private readonly MqttNetSourceLogger _logger; private readonly MqttServerOptions _serverOptions; private readonly MqttClientSessionsManager _sessionsManager; private readonly Dictionary<ushort, string> _topicAlias = new Dictionary<ushort, string>(); private CancellationTokenSource _cancellationToken = new CancellationTokenSource(); private bool _disconnectPacketSent; public IMqttChannelAdapter ChannelAdapter { get; } public MqttDisconnectPacket DisconnectPacket { get; set; } public string Endpoint { get; } public string Id => _connectPacket.ClientId; public bool IsRunning { get; private set; } public bool IsTakenOver { get; set; } public ushort KeepAlivePeriod => _connectPacket.KeepAlivePeriod; public MqttSession Session { get; } public MqttClientStatistics Statistics { get; } = new MqttClientStatistics(); public MqttClient(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter, MqttSession session, MqttServerOptions serverOptions, MqttServerEventContainer eventContainer, MqttClientSessionsManager sessionsManager, IMqttNetLogger logger) { _serverOptions = serverOptions ?? throw new ArgumentNullException("serverOptions"); _eventContainer = eventContainer ?? throw new ArgumentNullException("eventContainer"); _sessionsManager = sessionsManager ?? throw new ArgumentNullException("sessionsManager"); _connectPacket = connectPacket ?? throw new ArgumentNullException("connectPacket"); ChannelAdapter = channelAdapter ?? throw new ArgumentNullException("channelAdapter"); Endpoint = channelAdapter.Endpoint; Session = session ?? throw new ArgumentNullException("session"); if (logger == null) { throw new ArgumentNullException("logger"); } _logger = logger.WithSource("MqttClient"); } public void Dispose() { _cancellationToken?.Dispose(); } public void ResetStatistics() { ChannelAdapter.ResetStatistics(); Statistics.ResetStatistics(); } public async Task RunAsync() { _logger.Info("Client '{0}': Session started", Id); Session.LatestConnectPacket = _connectPacket; Session.WillMessageSent = false; try { CancellationToken cancellationToken = _cancellationToken.Token; IsRunning = true; Task.Factory.StartNew(() => SendPacketsLoop(cancellationToken), cancellationToken, TaskCreationOptions.PreferFairness, TaskScheduler.Default).ConfigureAwait(continueOnCapturedContext: false); await ReceivePackagesLoop(cancellationToken).ConfigureAwait(continueOnCapturedContext: false); } finally { IsRunning = false; Session.DisconnectedTimestamp = DateTime.UtcNow; _cancellationToken?.TryCancel(); _cancellationToken?.Dispose(); _cancellationToken = null; } bool flag = DisconnectPacket != null; if (!IsTakenOver && !flag && Session.LatestConnectPacket.WillFlag && !Session.WillMessageSent) { MqttApplicationMessage applicationMessage = MqttApplicationMessageFactory.Create(MqttPacketFactories.Publish.Create(Session.LatestConnectPacket)); _sessionsManager.DispatchApplicationMessage(Id, Session.Items, applicationMessage, CancellationToken.None); Session.WillMessageSent = true; _logger.Info("Client '{0}': Published will message", Id); } _logger.Info("Client '{0}': Connection stopped", Id); } public async Task SendPacketAsync(MqttPacket packet, CancellationToken cancellationToken) { packet = await InterceptPacketAsync(packet, cancellationToken).ConfigureAwait(continueOnCapturedContext: false); if (packet != null) { await ChannelAdapter.SendPacketAsync(packet, cancellationToken).ConfigureAwait(continueOnCapturedContext: false); Statistics.HandleSentPacket(packet); } } public async Task StopAsync(MqttServerClientDisconnectOptions disconnectOptions) { IsRunning = false; if (!_disconnectPacketSent && ChannelAdapter.PacketFormatterAdapter.ProtocolVersion == MqttProtocolVersion.V500 && disconnectOptions != null && (disconnectOptions.ReasonCode != 0 || (disconnectOptions.UserProperties?.Any() ?? false) || !string.IsNullOrEmpty(disconnectOptions.ReasonString) || !string.IsNullOrEmpty(disconnectOptions.ServerReference))) { await TrySendDisconnectPacket(disconnectOptions).ConfigureAwait(continueOnCapturedContext: false); } StopInternal(); } private Task ClientAcknowledgedPublishPacket(MqttPublishPacket publishPacket, MqttPacketWithIdentifier acknowledgePacket) { if (_eventContainer.ClientAcknowledgedPublishPacketEvent.HasHandlers) { ClientAcknowledgedPublishPacketEventArgs eventArgs = new ClientAcknowledgedPublishPacketEventArgs(Id, Session.Items, publishPacket, acknowledgePacket); return _eventContainer.ClientAcknowledgedPublishPacketEvent.TryInvokeAsync(eventArgs, _logger); } return CompletedTask.Instance; } private void HandleIncomingPingReqPacket() { Session.EnqueueHealthPacket(new MqttPacketBusItem(MqttPingRespPacket.Instance)); } private Task HandleIncomingPubAckPacket(MqttPubAckPacket pubAckPacket) { MqttPublishPacket mqttPublishPacket = Session.AcknowledgePublishPacket(pubAckPacket.PacketIdentifier); if (mqttPublishPacket != null) { return ClientAcknowledgedPublishPacket(mqttPublishPacket, pubAckPacket); } return CompletedTask.Instance; } private Task HandleIncomingPubCompPacket(MqttPubCompPacket pubCompPacket) { MqttPublishPacket mqttPublishPacket = Session.AcknowledgePublishPacket(pubCompPacket.PacketIdentifier); if (mqttPublishPacket != null) { return ClientAcknowledgedPublishPacket(mqttPublishPacket, pubCompPacket); } return CompletedTask.Instance; } private async Task HandleIncomingPublishPacket(MqttPublishPacket publishPacket, CancellationToken cancellationToken) { HandleTopicAlias(publishPacket); MqttApplicationMessage applicationMessage = MqttApplicationMessageFactory.Create(publishPacket); DispatchApplicationMessageResult dispatchApplicationMessageResult = await _sessionsManager.DispatchApplicationMessage(Id, Session.Items, applicationMessage, cancellationToken).ConfigureAwait(continueOnCapturedContext: false); if (dispatchApplicationMessageResult.CloseConnection) { await StopAsync(new MqttServerClientDisconnectOptions { ReasonCode = MqttDisconnectReasonCode.UnspecifiedError }); return; } switch (publishPacket.QualityOfServiceLevel) { case MqttQualityOfServiceLevel.AtLeastOnce: { MqttPubAckPacket packet2 = MqttPacketFactories.PubAck.Create(publishPacket, dispatchApplicationMessageResult); Session.EnqueueControlPacket(new MqttPacketBusItem(packet2)); break; } case MqttQualityOfServiceLevel.ExactlyOnce: { MqttPacket packet = MqttPacketFactories.PubRec.Create(publishPacket, dispatchApplicationMessageResult); Session.EnqueueControlPacket(new MqttPacketBusItem(packet)); break; } default: throw new MqttCommunicationException("Received a not supported QoS level"); case MqttQualityOfServiceLevel.AtMostOnce: break; } } private Task HandleIncomingPubRecPacket(MqttPubRecPacket pubRecPacket) { MqttPubRelPacket packet = MqttPacketFactories.PubRel.Create(pubRecPacket, MqttApplicationMessageReceivedReasonCode.Success); Session.EnqueueControlPacket(new MqttPacketBusItem(packet)); return CompletedTask.Instance; } private void HandleIncomingPubRelPacket(MqttPubRelPacket pubRelPacket) { MqttPubCompPacket packet = MqttPacketFactories.PubComp.Create(pubRelPacket, MqttApplicationMessageReceivedReasonCode.Success); Session.EnqueueControlPacket(new MqttPacketBusItem(packet)); } private async Task HandleIncomingSubscribePacket(MqttSubscribePacket subscribePacket, CancellationToken cancellationToken) { SubscribeResult subscribeResult = await Session.Subscribe(subscribePacket, cancellationToken).ConfigureAwait(continueOnCapturedContext: false); MqttSubAckPacket packet = MqttPacketFactories.SubAck.Create(subscribePacket, subscribeResult); Session.EnqueueControlPacket(new MqttPacketBusItem(packet)); if (subscribeResult.CloseConnection) { StopInternal(); } else { if (subscribeResult.RetainedMessages == null) { return; } foreach (MqttRetainedMessageMatch retainedMessage in subscribeResult.RetainedMessages) { MqttPublishPacket packet2 = MqttPacketFactories.Publish.Create(retainedMessage); Session.EnqueueDataPacket(new MqttPacketBusItem(packet2)); } } } private async Task HandleIncomingUnsubscribePacket(MqttUnsubscribePacket unsubscribePacket, CancellationToken cancellationToken) { UnsubscribeResult unsubscribeResult = await Session.Unsubscribe(unsubscribePacket, cancellationToken).ConfigureAwait(continueOnCapturedContext: false); MqttUnsubAckPacket packet = MqttPacketFactories.UnsubAck.Create(unsubscribePacket, unsubscribeResult); Session.EnqueueControlPacket(new MqttPacketBusItem(packet)); if (unsubscribeResult.CloseConnection) { StopInternal(); } } private void HandleTopicAlias(MqttPublishPacket publishPacket) { if (publishPacket.TopicAlias == 0) { return; } lock (_topicAlias) { string value; if (!string.IsNullOrEmpty(publishPacket.Topic)) { _topicAlias[publishPacket.TopicAlias] = publishPacket.Topic; } else if (_topicAlias.TryGetValue(publishPacket.TopicAlias, out value)) { publishPacket.Topic = value; } else { _logger.Warning("Client '{0}': Received invalid topic alias ({1})", Id, publishPacket.TopicAlias); } } } private async Task<MqttPacket> InterceptPacketAsync(MqttPacket packet, CancellationToken cancellationToken) { if (!_eventContainer.InterceptingOutboundPacketEvent.HasHandlers) { return packet; } InterceptingPacketEventArgs interceptingPacketEventArgs = new InterceptingPacketEventArgs(cancellationToken, Id, Endpoint, packet, Session.Items); await _eventContainer.InterceptingOutboundPacketEvent.InvokeAsync(interceptingPacketEventArgs).ConfigureAwait(continueOnCapturedContext: false); if (!interceptingPacketEventArgs.ProcessPacket || packet == null) { return null; } return interceptingPacketEventArgs.Packet; } private async Task ReceivePackagesLoop(CancellationToken cancellationToken) { MqttPacket currentPacket = null; try { while (!cancellationToken.IsCancellationRequested) { await Task.Yield(); currentPacket = await ChannelAdapter.ReceivePacketAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false); if (currentPacket == null || cancellationToken.IsCancellationRequested || IsTakenOver || !IsRunning) { break; } bool flag = true; if (_eventContainer.InterceptingInboundPacketEvent.HasHandlers) { InterceptingPacketEventArgs interceptingPacketEventArgs = new InterceptingPacketEventArgs(cancellationToken, Id, Endpoint, currentPacket, Session.Items); await _eventContainer.InterceptingInboundPacketEvent.InvokeAsync(interceptingPacketEventArgs).ConfigureAwait(continueOnCapturedContext: false); currentPacket = interceptingPacketEventArgs.Packet; flag = interceptingPacketEventArgs.ProcessPacket; } if (!flag || currentPacket == null) { continue; } Statistics.HandleReceivedPacket(currentPacket); if (currentPacket is MqttPublishPacket publishPacket) { await HandleIncomingPublishPacket(publishPacket, cancellationToken).ConfigureAwait(continueOnCapturedContext: false); continue; } if (currentPacket is MqttPubAckPacket pubAckPacket) { await HandleIncomingPubAckPacket(pubAckPacket).ConfigureAwait(continueOnCapturedContext: false); continue; } if (currentPacket is MqttPubCompPacket pubCompPacket) { await HandleIncomingPubCompPacket(pubCompPacket).ConfigureAwait(continueOnCapturedContext: false); continue; } if (currentPacket is MqttPubRecPacket pubRecPacket) { await HandleIncomingPubRecPacket(pubRecPacket).ConfigureAwait(continueOnCapturedContext: false); continue; } if (currentPacket is MqttPubRelPacket pubRelPacket) { HandleIncomingPubRelPacket(pubRelPacket); continue; } if (currentPacket is MqttSubscribePacket subscribePacket) { await HandleIncomingSubscribePacket(subscribePacket, cancellationToken).ConfigureAwait(continueOnCapturedContext: false); continue; } if (currentPacket is MqttUnsubscribePacket unsubscribePacket) { await HandleIncomingUnsubscribePacket(unsubscribePacket, cancellationToken).ConfigureAwait(continueOnCapturedContext: false); continue; } if (currentPacket is MqttPingReqPacket) { HandleIncomingPingReqPacket(); continue; } if (currentPacket is MqttPingRespPacket) { throw new MqttProtocolViolationException("A PINGRESP Packet is sent by the Server to the Client in response to a PINGREQ Packet only."); } if (currentPacket is MqttDisconnectPacket disconnectPacket) { DisconnectPacket = disconnectPacket; break; } throw new MqttProtocolViolationException("Packet not allowed"); } } catch (OperationCanceledException) { } catch (Exception ex2) { if (ex2 is MqttCommunicationException) { _logger.Warning(ex2, "Client '{0}': Communication exception while receiving packets", Id); return; } MqttNetLogLevel logLevel = MqttNetLogLevel.Error; if (!IsRunning) { logLevel = MqttNetLogLevel.Warning; } if (currentPacket == null) { _logger.Publish(logLevel, ex2, "Client '{0}': Error while receiving packets", Id); } else { _logger.Publish(logLevel, ex2, "Client '{0}': Error while processing {1} packet", Id, currentPacket.GetRfcName()); } } } private async Task SendPacketsLoop(CancellationToken cancellationToken) { MqttPacketBusItem packetBusItem = null; try { while (!cancellationToken.IsCancellationRequested && !IsTakenOver && IsRunning) { packetBusItem = await Session.DequeuePacketAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false); if (cancellationToken.IsCancellationRequested || IsTakenOver || !IsRunning) { break; } try { _ = 1; try { await SendPacketAsync(packetBusItem.Packet, cancellationToken).ConfigureAwait(continueOnCapturedContext: false); packetBusItem.Complete(); } catch (OperationCanceledException) { packetBusItem.Cancel(); } catch (Exception exception) { packetBusItem.Fail(exception); } } finally { await Task.Yield(); } } } catch (OperationCanceledException) { } catch (Exception ex3) { if (ex3 is MqttCommunicationTimedOutException) { _logger.Warning(ex3, "Client '{0}': Sending PUBLISH packet failed due to timeout", Id); } else if (ex3 is MqttCommunicationException) { _logger.Warning(ex3, "Client '{0}': Sending PUBLISH packet failed due to communication exception", Id); } else { _logger.Error(ex3, "Client '{0}': Sending PUBLISH packet failed", Id); } if (packetBusItem?.Packet is MqttPublishPacket mqttPublishPacket && mqttPublishPacket.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) { mqttPublishPacket.Dup = true; Session.EnqueueDataPacket(new MqttPacketBusItem(mqttPublishPacket)); } StopInternal(); } } private void StopInternal() { _cancellationToken?.TryCancel(); } private async Task TrySendDisconnectPacket(MqttServerClientDisconnectOptions options) { try { _disconnectPacketSent = true; MqttDisconnectPacket packet = MqttPacketFactories.Disconnect.Create(options); using CancellationTokenSource timeout = new CancellationTokenSource(_serverOptions.DefaultCommunicationTimeout); await SendPacketAsync(packet, timeout.Token).ConfigureAwait(continueOnCapturedContext: false); } catch (Exception exception) { _logger.Warning(exception, "Client '{0}': Error while sending DISCONNECT packet (ReasonCode = {1})", Id, options.ReasonCode); } } } public sealed class MqttClientSessionsManager : ISubscriptionChangedNotification, IDisposable { private readonly Dictionary<string, MqttClient> _clients = new Dictionary<string, MqttClient>(4096); private readonly AsyncLock _createConnectionSyncRoot = new AsyncLock(); private readonly MqttServerEventContainer _eventContainer; private readonly MqttNetSourceLogger _logger; private readonly MqttServerOptions _options; private readonly MqttRetainedMessagesManager _retainedMessagesManager; private readonly IMqttNetLogger _rootLogger; private readonly ReaderWriterLockSlim _sessionsManagementLock = new ReaderWriterLockSlim(); private readonly MqttSessionsStorage _sessionsStorage = new MqttSessionsStorage(); private readonly HashSet<MqttSession> _subscriberSessions = new HashSet<MqttSession>(); public MqttClientSessionsManager(MqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, MqttServerEventContainer eventContainer, IMqttNetLogger logger) { if (logger == null) { throw new ArgumentNullException("logger"); } _logger = logger.WithSource("MqttClientSessionsManager"); _rootLogger = logger; _options = options ?? throw new ArgumentNullException("options"); _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException("retainedMessagesManager"); _eventContainer = eventContainer ?? throw new ArgumentNullException("eventContainer"); } public async Task CloseAllConnections(MqttServerClientDisconnectOptions options) { if (options == null) { throw new ArgumentNullException("options"); } List<MqttClient> list; lock (_clients) { list = _clients.Values.ToList(); _clients.Clear(); } foreach (MqttClient item in list) { await item.StopAsync(options).ConfigureAwait(continueOnCapturedContext: false); } } public async Task DeleteSessionAsync(string clientId) { _logger.Verbose("Deleting session for client '{0}'.", clientId); MqttClient value; lock (_clients) { _clients.TryGetValue(clientId, out value); } _sessionsManagementLock.EnterWriteLock(); MqttSession session; try { if (_sessionsStorage.TryRemoveSession(clientId, out session)) { _subscriberSessions.Remove(session); } } finally { _sessionsManagementLock.ExitWriteLock(); } try { if (value != null) { await value.StopAsync(new MqttServerClientDisconnectOptions { ReasonCode = MqttDisconnectReasonCode.NormalDisconnection }).ConfigureAwait(continueOnCapturedContext: false); } } catch (Exception exception) { _logger.Error(exception, "Error while deleting session '{0}'", clientId); } try { if (_eventContainer.SessionDeletedEvent.HasHandlers && session != null) { SessionDeletedEventArgs eventArgs = new SessionDeletedEventArgs(clientId, session.Items); await _eventContainer.SessionDeletedEvent.TryInvokeAsync(eventArgs, _logger).ConfigureAwait(continueOnCapturedContext: false); } } catch (Exception exception2) { _logger.Error(exception2, "Error while executing session deleted event for session '{0}'", clientId); } session?.Dispose(); _logger.Verbose("Session of client '{0}' deleted", clientId); } public async Task<DispatchApplicationMessageResult> DispatchApplicationMessage(string senderId, IDictionary senderSessionItems, MqttApplicationMessage applicationMessage, CancellationToken cancellationToken) { bool flag = true; bool closeConnection = false; string reasonString = null; List<MqttUserProperty> userProperties = null; int reasonCode = 0; if (_eventContainer.InterceptingPublishEvent.HasHandlers) { InterceptingPublishEventArgs interceptingPublishEventArgs = new InterceptingPublishEventArgs(applicationMessage, cancellationToken, senderId, senderSessionItems); if (string.IsNullOrEmpty(interceptingPublishEventArgs.ApplicationMessage.Topic)) { interceptingPublishEventArgs.Response.ReasonCode = MqttPubAckReasonCode.TopicNameInvalid; interceptingPublishEventArgs.ProcessPublish = false; } await _eventContainer.InterceptingPublishEvent.InvokeAsync(interceptingPublishEventArgs).ConfigureAwait(continueOnCapturedContext: false); applicationMessage = interceptingPublishEventArgs.ApplicationMessage; closeConnection = interceptingPublishEventArgs.CloseConnection; flag = interceptingPublishEventArgs.ProcessPublish; reasonString = interceptingPublishEventArgs.Response.ReasonString; userProperties = interceptingPublishEventArgs.Response.UserProperties; reasonCode = (int)interceptingPublishEventArgs.Response.ReasonCode; } if (flag && applicationMessage != null) { int matchingSubscribersCount = 0; try { if (applicationMessage.Retain) { await _retainedMessagesManager.UpdateMessage(senderId, applicationMessage).ConfigureAwait(continueOnCapturedContext: false); } _sessionsManagementLock.EnterReadLock(); List<MqttSession> list; try { list = _subscriberSessions.ToList(); } finally { _sessionsManagementLock.ExitReadLock(); } MqttTopicHash.Calculate(applicationMessage.Topic, out var topicHash, out var _, out var _); foreach (MqttSession session in list) { if (!session.TryCheckSubscriptions(applicationMessage.Topic, topicHash, applicationMessage.QualityOfServiceLevel, senderId, out var checkSubscriptionsResult) || !checkSubscriptionsResult.IsSubscribed) { continue; } if (_eventContainer.InterceptingClientEnqueueEvent.HasHandlers) { InterceptingClientApplicationMessageEnqueueEventArgs eventArgs = new InterceptingClientApplicationMessageEnqueueEventArgs(senderId, session.Id, applicationMessage); await _eventContainer.InterceptingClientEnqueueEvent.InvokeAsync(eventArgs).ConfigureAwait(continueOnCapturedContext: false); if (!eventArgs.AcceptEnqueue) { continue; } } MqttPublishPacket mqttPublishPacket = MqttPacketFactories.Publish.Create(applicationMessage); mqttPublishPacket.QualityOfServiceLevel = checkSubscriptionsResult.QualityOfServiceLevel; mqttPublishPacket.SubscriptionIdentifiers = checkSubscriptionsResult.SubscriptionIdentifiers; if (mqttPublishPacket.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) { mqttPublishPacket.PacketIdentifier = session.PacketIdentifierProvider.GetNextPacketIdentifier(); } if (checkSubscriptionsResult.RetainAsPublished) { mqttPublishPacket.Retain = applicationMessage.Retain; } else { mqttPublishPacket.Retain = false; } matchingSubscribersCount++; EnqueueDataPacketResult enqueueDataPacketResult = session.EnqueueDataPacket(new MqttPacketBusItem(mqttPublishPacket)); if (_eventContainer.ApplicationMessageEnqueuedOrDroppedEvent.HasHandlers) { ApplicationMessageEnqueuedEventArgs eventArgs2 = new ApplicationMessageEnqueuedEventArgs(senderId, session.Id, applicationMessage, enqueueDataPacketResult == EnqueueDataPacketResult.Dropped); await _eventContainer.ApplicationMessageEnqueuedOrDroppedEvent.InvokeAsync(eventArgs2).ConfigureAwait(continueOnCapturedContext: false); } _logger.Verbose("Client '{0}': Queued PUBLISH packet with topic '{1}'", session.Id, applicationMessage.Topic); checkSubscriptionsResult = null; } if (matchingSubscribersCount == 0) { if (reasonCode == 0) { reasonCode = 16; } await FireApplicationMessageNotConsumedEvent(applicationMessage, senderId).ConfigureAwait(continueOnCapturedContext: false); } } catch (Exception exception) { _logger.Error(exception, "Error while processing next queued application message"); } } return new DispatchApplicationMessageResult(reasonCode, closeConnection, reasonString, userProperties); } public void Dispose() { _createConnectionSyncRoot.Dispose(); _sessionsManagementLock.EnterWriteLock(); try { _sessionsStorage.Dispose(); } finally { _sessionsManagementLock.ExitWriteLock(); } _sessionsManagementLock?.Dispose(); } public MqttClient GetClient(string id) { lock (_clients) { if (!_clients.TryGetValue(id, out var value)) { throw new InvalidOperationException("Client with ID '" + id + "' not found."); } return value; } } public List<MqttClient> GetClients() { lock (_clients) { return _clients.Values.ToList(); } } public Task<IList<MqttClientStatus>> GetClientsStatus() { List<MqttClientStatus> list = new List<MqttClientStatus>(); lock (_clients) { foreach (MqttClient value in _clients.Values) { MqttClientStatus item = new MqttClientStatus(value) { Session = new MqttSessionStatus(value.Session) }; list.Add(item); } } return Task.FromResult((IList<MqttClientStatus>)list); } public Task<IList<MqttSessionStatus>> GetSessionsStatus() { List<MqttSessionStatus> list = new List<MqttSessionStatus>(); _sessionsManagementLock.EnterReadLock(); try { foreach (MqttSession item2 in _sessionsStorage.ReadAllSessions()) { MqttSessionStatus item = new MqttSessionStatus(item2); list.Add(item); } } finally { _sessionsManagementLock.ExitReadLock(); } return Task.FromResult((IList<MqttSessionStatus>)list); } public async Task HandleClientConnectionAsync(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken) { MqttClient client = null; try { _ = 6; try { MqttConnectPacket connectPacket = await ReceiveConnectPacket(channelAdapter, cancellationToken).ConfigureAwait(continueOnCapturedContext: false); if (connectPacket == null) { goto end_IL_0079; } ValidatingConnectionEventArgs validatingConnectionEventArgs = await ValidateConnection(connectPacket, channelAdapter).ConfigureAwait(continueOnCapturedContext: false); MqttConnAckPacket connAckPacket = MqttPacketFactories.ConnAck.Create(validatingConnectionEventArgs); if (validatingConnectionEventArgs.ReasonCode != 0) { await channelAdapter.SendPacketAsync(connAckPacket, cancellationToken).ConfigureAwait(continueOnCapturedContext: false); goto end_IL_0079; } client = await CreateClientConnection(connectPacket, connAckPacket, channelAdapter, validatingConnectionEventArgs).ConfigureAwait(continueOnCapturedContext: false); await client.SendPacketAsync(connAckPacket, cancellationToken).ConfigureAwait(continueOnCapturedContext: false); if (_eventContainer.ClientConnectedEvent.HasHandlers) { ClientConnectedEventArgs eventArgs = new ClientConnectedEventArgs(connectPacket, channelAdapter.PacketFormatterAdapter.ProtocolVersion, channelAdapter.Endpoint, client.Session.Items); await _eventContainer.ClientConnectedEvent.TryInvokeAsync(eventArgs, _logger).ConfigureAwait(continueOnCapturedContext: false); } await client.RunAsync().ConfigureAwait(continueOnCapturedContext: false); goto end_IL_0052; end_IL_0079:; } catch (ObjectDisposedException) { goto end_IL_0052; } catch (OperationCanceledException) { goto end_IL_0052; } catch (Exception ex3) { _logger.Error(ex3, ex3.Message); goto end_IL_0052; } end_IL_0052:; } finally { if (client != null) { if (client.Id != null && !client.IsTakenOver) { lock (_clients) { _clients.Remove(client.Id); } if (!_options.EnablePersistentSessions || !client.Session.IsPersistent) { await DeleteSessionAsync(client.Id).ConfigureAwait(continueOnCapturedContext: false); } } string endpoint = client.Endpoint; if (client.Id != null && !client.IsTakenOver && _eventContainer.ClientDisconnectedEvent.HasHandlers) { MqttClientDisconnectType disconnectType = ((client.DisconnectPacket == null) ? MqttClientDisconnectType.NotClean : MqttClientDisconnectType.Clean); ClientDisconnectedEventArgs eventArgs2 = new ClientDisconnectedEventArgs(client.Id, client.DisconnectPacket, disconnectType, endpoint, client.Session.Items); await _eventContainer.ClientDisconnectedEvent.InvokeAsync(eventArgs2).ConfigureAwait(continueOnCapturedContext: false); } } using CancellationTokenSource timeout = new CancellationTokenSource(_options.DefaultCommunicationTimeout); await channelAdapter.DisconnectAsync(timeout.Token).ConfigureAwait(continueOnCapturedContext: false); } } public void OnSubscriptionsAdded(MqttSession clientSession, List<string> topics) { _sessionsManagementLock.EnterWriteLock(); try { if (!clientSession.HasSubscribedTopics) { _subscriberSessions.Add(clientSession); } foreach (string topic in topics) { clientSession.AddSubscribedTopic(topic); } } finally { _sessionsManagementLock.ExitWriteLock(); } } public void OnSubscriptionsRemoved(MqttSession clientSession, List<string> subscriptionTopics) { _sessionsManagementLock.EnterWriteLock(); try { foreach (string subscriptionTopic in subscriptionTopics) { clientSession.RemoveSubscribedTopic(subscriptionTopic); } if (!clientSession.HasSubscribedTopics) { _subscriberSessions.Remove(clientSession); } } finally { _sessionsManagementLock.ExitWriteLock(); } } public void Start() { if (!_options.EnablePersistentSessions) { _sessionsStorage.Clear(); } } public async Task SubscribeAsync(string clientId, ICollection<MqttTopicFilter> topicFilters) { if (clientId == null) { throw new ArgumentNullException("clientId"); } if (topicFilters == null) { throw new ArgumentNullException("topicFilters"); } MqttSubscribePacket mqttSubscribePacket = new MqttSubscribePacket(); mqttSubscribePacket.TopicFilters.AddRange(topicFilters); MqttSession clientSession = GetClientSession(clientId); SubscribeResult subscribeResult = await clientSession.Subscribe(mqttSubscribePacket, CancellationToken.None).ConfigureAwait(continueOnCapturedContext: false); if (subscribeResult.RetainedMessages == null) { return; } foreach (MqttRetainedMessageMatch retainedMessage in subscribeResult.RetainedMessages) { MqttPublishPacket packet = MqttPacketFactories.Publish.Create(retainedMessage); clientSession.EnqueueDataPacket(new MqttPacketBusItem(packet)); } } public Task UnsubscribeAsync(string clientId, ICollection<string> topicFilters) { if (clientId == null) { throw new ArgumentNullException("clientId"); } if (topicFilters == null) { throw new ArgumentNullException("topicFilters"); } MqttUnsubscribePacket mqttUnsubscribePacket = new MqttUnsubscribePacket(); mqttUnsubscribePacket.TopicFilters.AddRange(topicFilters); return GetClientSession(clientId).Unsubscribe(mqttUnsubscribePacket, CancellationToken.None); } private MqttClient CreateClient(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter, MqttSession session) { return new MqttClient(connectPacket, channelAdapter, session, _options, _eventContainer, this, _rootLogger); } private async Task<MqttClient> CreateClientConnection(MqttConnectPacket connectPacket, MqttConnAckPacket connAckPacket, IMqttChannelAdapter channelAdapter, ValidatingConnectionEventArgs validatingConnectionEventArgs) { bool sessionShouldPersist = ((validatingConnectionEventArgs.ProtocolVersion != MqttProtocolVersion.V500) ? (!connectPacket.CleanSession) : (validatingConnectionEventArgs.SessionExpiryInterval != 0)); MqttClient client; using (await _createConnectionSyncRoot.EnterAsync().ConfigureAwait(continueOnCapturedContext: false)) { _sessionsManagementLock.EnterWriteLock(); MqttSession oldSession; MqttClient oldClient; try { MqttSession mqttSession; if (!_sessionsStorage.TryGetSession(connectPacket.ClientId, out oldSession)) { mqttSession = CreateSession(connectPacket, validatingConnectionEventArgs, sessionShouldPersist); } else if (connectPacket.CleanSession) { _logger.Verbose("Deleting existing session of client '{0}' due to clean start", connectPacket.ClientId); _subscriberSessions.Remove(oldSession); mqttSession = CreateSession(connectPacket, validatingConnectionEventArgs, sessionShouldPersist); } else { _logger.Verbose("Reusing existing session of client '{0}'", connectPacket.ClientId); mqttSession = oldSession; oldSession = null; mqttSession.IsPersistent = sessionShouldPersist; mqttSession.DisconnectedTimestamp = null; mqttSession.Recover(); connAckPacket.IsSessionPresent = true; } _sessionsStorage.UpdateSession(connectPacket.ClientId, mqttSession); lock (_clients) { _clients.TryGetValue(connectPacket.ClientId, out oldClient); if (oldClient != null) { oldClient.IsTakenOver = true; } client = CreateClient(connectPacket, channelAdapter, mqttSession); _clients[connectPacket.ClientId] = client; } } finally { _sessionsManagementLock.ExitWriteLock(); } if (!connAckPacket.IsSessionPresent) { PreparingSessionEventArgs eventArgs = new PreparingSessionEventArgs(); await _eventContainer.PreparingSessionEvent.TryInvokeAsync(eventArgs, _logger).ConfigureAwait(continueOnCapturedContext: false); } if (oldClient != null) { await oldClient.StopAsync(new MqttServerClientDisconnectOptions { ReasonCode = MqttDisconnectReasonCode.SessionTakenOver }).ConfigureAwait(continueOnCapturedContext: false); if (_eventContainer.ClientDisconnectedEvent.HasHandlers) { ClientDisconnectedEventArgs eventArgs2 = new ClientDisconnectedEventArgs(oldClient.Id, null, MqttClientDisconnectType.Takeover, oldClient.Endpoint, oldClient.Session.Items); await _eventContainer.ClientDisconnectedEvent.TryInvokeAsync(eventArgs2, _logger).ConfigureAwait(continueOnCapturedContext: false); } } oldSession?.Dispose(); } return client; } private MqttSession CreateSession(MqttConnectPacket connectPacket, ValidatingConnectionEventArgs validatingConnectionEventArgs, bool isPersistent) { _logger.Verbose("Created new session for client '{0}'", connectPacket.ClientId); return new MqttSession(connectPacket, validatingConnectionEventArgs.SessionItems, _options, _eventContainer, _retainedMessagesManager, this) { IsPersistent = isPersistent }; } private async Task FireApplicationMessageNotConsumedEvent(MqttApplicationMessage applicationMessage, string senderId) { if (_eventContainer.ApplicationMessageNotConsumedEvent.HasHandlers) { ApplicationMessageNotConsumedEventArgs eventArgs = new ApplicationMessageNotConsumedEventArgs(applicationMessage, senderId); await _eventContainer.ApplicationMessageNotConsumedEvent.InvokeAsync(eventArgs).ConfigureAwait(continueOnCapturedContext: false); } } private MqttSession GetClientSession(string clientId) { _sessionsManagementLock.EnterReadLock(); try { if (!_sessionsStorage.TryGetSession(clientId, out var session)) { throw new InvalidOperationException("Client session '" + clientId + "' is unknown."); } return session; } finally { _sessionsManagementLock.ExitReadLock(); } } private async Task<MqttConnectPacket> ReceiveConnectPacket(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken) { try { using CancellationTokenSource timeoutToken = new CancellationTokenSource(_options.DefaultCommunicationTimeout); using CancellationTokenSource effectiveCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(timeoutToken.Token, cancellationToken); if (await channelAdapter.ReceivePacketAsync(effectiveCancellationToken.Token).ConfigureAwait(continueOnCapturedContext: false) is MqttConnectPacket result) { return result; } } catch (OperationCanceledException) { _logger.Warning("Client '{0}': Connected but did not sent a CONNECT packet.", channelAdapter.Endpoint); } catch (MqttCommunicationTimedOutException) { _logger.Warning("Client '{0}': Connected but did not sent a CONNECT packet.", channelAdapter.Endpoint); } _logger.Warning("Client '{0}': First received packet was no 'CONNECT' packet [MQTT-3.1.0-1].", channelAdapter.Endpoint); return null; } private async Task<ValidatingConnectionEventArgs> ValidateConnection(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter) { ConcurrentDictionary<object, object> sessionItems = new ConcurrentDictionary<object, object>(); ValidatingConnectionEventArgs eventArgs = new ValidatingConnectionEventArgs(connectPacket, channelAdapter, sessionItems); await _eventContainer.ValidatingConnectionEvent.InvokeAsync(eventArgs).ConfigureAwait(continueOnCapturedContext: false); if (string.IsNullOrEmpty(connectPacket.ClientId) && channelAdapter.PacketFormatterAdapter.ProtocolVersion == MqttProtocolVersion.V500) { connectPacket.ClientId = eventArgs.AssignedClientIdentifier; } if (string.IsNullOrEmpty(connectPacket.ClientId)) { eventArgs.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid; } return eventArgs; } } public sealed class MqttClientStatistics { private struct Statistics { public long _receivedPacketsCount; public long _sentPacketsCount; public long _receivedApplicationMessagesCount; public long _sentApplicationMessagesCount; public void Reset() { Volatile.Write(ref _receivedPacketsCount, 0L); Volatile.Write(ref _sentPacketsCount, 0L); Volatile.Write(ref _receivedApplicationMessagesCount, 0L); Volatile.Write(ref _sentApplicationMessagesCount, 0L); } } private Statistics _statistics = new Statistics { _receivedPacketsCount = 1L, _sentPacketsCount = 1L }; public DateTime ConnectedTimestamp { get; } public DateTime LastNonKeepAlivePacketReceivedTimestamp { get; private set; } public DateTime LastPacketReceivedTimestamp { get; private set; } public DateTime LastPacketSentTimestamp { get; private set; } public long SentApplicationMessagesCount => Volatile.Read(ref _statistics._sentApplicationMessagesCount); public long ReceivedApplicationMessagesCount => Volatile.Read(ref _statistics._receivedApplicationMessagesCount); public long SentPacketsCount => Volatile.Read(ref _statistics._sentPacketsCount); public long ReceivedPacketsCount => Volatile.Read(ref _statistics._receivedPacketsCount); public MqttClientStatistics() { ConnectedTimestamp = DateTime.UtcNow; LastPacketReceivedTimestamp = ConnectedTimestamp; LastPacketSentTimestamp = ConnectedTimestamp; LastNonKeepAlivePacketReceivedTimestamp = ConnectedTimestamp; } public void HandleReceivedPacket(MqttPacket packet) { if (packet == null) { throw new ArgumentNullException("packet"); } LastPacketSentTimestamp = DateTime.UtcNow; Interlocked.Increment(ref _statistics._sentPacketsCount); if (packet is MqttPublishPacket) { Interlocked.Increment(ref _statistics._sentApplicationMessagesCount); } if (!(packet is MqttPingReqPacket) && !(packet is MqttPingRespPacket)) { LastNonKeepAlivePacketReceivedTimestamp = LastPacketReceivedTimestamp; } } public void ResetStatistics() { _statistics.Reset(); } public void HandleSentPacket(MqttPacket packet) { if (packet == null) { throw new ArgumentNullException("packet"); } LastPacketReceivedTimestamp = DateTime.UtcNow; Interlocked.Increment(ref _statistics._receivedPacketsCount); if (packet is MqttPublishPacket) { Interlocked.Increment(ref _statistics._receivedApplicationMessagesCount); } } } public sealed class MqttClientSubscriptionsManager : IDisposable { private sealed class CreateSubscriptionResult { public bool IsNewSubscription { get; set; } public MqttSubscription Subscription { get; set; } } private static readonly List<uint> EmptySubscriptionIdentifiers = new List<uint>(); private readonly MqttServerEventContainer _eventContainer; private readonly Dictionary<ulong, HashSet<MqttSubscription>> _noWildcardSubscriptionsByTopicHash = new Dictionary<ulong, HashSet<MqttSubscription>>(); private readonly MqttRetainedMessagesManager _retainedMessagesManager; private readonly MqttSession _session; private readonly ISubscriptionChangedNotification _subscriptionChangedNotification; private readonly Dictionary<string, MqttSubscription> _subscriptions = new Dictionary<string, MqttSubscription>(); private readonly ReaderWriterLockSlim _subscriptionsLock = new ReaderWriterLockSlim(); private readonly Dictionary<ulong, TopicHashMaskSubscriptions> _wildcardSubscriptionsByTopicHash = new Dictionary<ulong, TopicHashMaskSubscriptions>(); public MqttClientSubscriptionsManager(MqttSession session, MqttServerEventContainer eventContainer, MqttRetainedMessagesManager retainedMessagesManager, ISubscriptionChangedNotification subscriptionChangedNotification) { _session = session ?? throw new ArgumentNullException("session"); _eventContainer = eventContainer ?? throw new ArgumentNullException("eventContainer"); _retainedMessagesManager = retainedMessagesManager ?? throw new ArgumentNullException("retainedMessagesManager"); _subscriptionChangedNotification = subscriptionChangedNotification; } public CheckSubscriptionsResult CheckSubscriptions(string topic, ulong topicHash, MqttQualityOfServiceLevel qualityOfServiceLevel, string senderId) { List<MqttSubscription> list = new List<MqttSubscription>(); _subscriptionsLock.EnterReadLock(); try { if (_noWildcardSubscriptionsByTopicHash.TryGetValue(topicHash, out var value)) { list.AddRange(value); } foreach (KeyValuePair<ulong, TopicHashMaskSubscriptions> item in _wildcardSubscriptionsByTopicHash) { ulong key = item.Key; foreach (KeyValuePair<ulong, HashSet<MqttSubscription>> item2 in item.Value.SubscriptionsByHashMask) { ulong key2 = item2.Key; if ((topicHash & key2) == key) { HashSet<MqttSubscription> value2 = item2.Value; list.AddRange(value2); } } } } finally { _subscriptionsLock.ExitReadLock(); } if (list.Count == 0) { return CheckSubscriptionsResult.NotSubscribed; } bool flag = string.Equals(senderId, _session.Id); int num = -1; HashSet<uint> hashSet = null; bool retainAsPublished = false; foreach (MqttSubscription item3 in list) { if ((item3.NoLocal && flag) || MqttTopicFilterComparer.Compare(topic, item3.Topic) != MqttTopicFilterCompareResult.IsMatch) { continue; } if (item3.RetainAsPublished) { retainAsPublished = true; } if ((int)item3.GrantedQualityOfServiceLevel > num) { num = (int)item3.GrantedQualityOfServiceLevel; } if (item3.Identifier != 0) { if (hashSet == null) { hashSet = new HashSet<uint>(); } hashSet.Add(item3.Identifier); } } if (num == -1) { return CheckSubscriptionsResult.NotSubscribed; } CheckSubscriptionsResult checkSubscriptionsResult = new CheckSubscriptionsResult { IsSubscribed = true, RetainAsPublished = retainAsPublished, SubscriptionIdentifiers = (hashSet?.ToList() ?? EmptySubscriptionIdentifiers), QualityOfServiceLevel = qualityOfServiceLevel }; if (num < (int)qualityOfServiceLevel) { checkSubscriptionsResult.QualityOfServiceLevel = (MqttQualityOfServiceLevel)num; } return checkSubscriptionsResult; } public void Dispose() { _subscriptionsLock?.Dispose(); } public async Task<SubscribeResult> Subscribe(MqttSubscribePacket subscribePacket, CancellationToken cancellationToken) { if (subscribePacket == null) { throw new ArgumentNullException("subscribePacket"); } IList<MqttApplicationMessage> retainedApplicationMessages = await _retainedMessagesManager.GetMessages().ConfigureAwait(continueOnCapturedContext: false); SubscribeResult result = new SubscribeResult(subscribePacket.TopicFilters.Count); List<string> addedSubscriptions = new List<string>(); List<MqttTopicFilter> finalTopicFilters = new List<MqttTopicFilter>(); foreach (MqttTopicFilter item in subscribePacket.TopicFilters.OrderByDescending((MqttTopicFilter f) => f.QualityOfServiceLevel)) { InterceptingSubscriptionEventArgs interceptingSubscriptionEventArgs = await InterceptSubscribe(item, subscribePacket.UserProperties, cancellationToken).ConfigureAwait(continueOnCapturedContext: false); MqttTopicFilter topicFilter = interceptingSubscriptionEventArgs.TopicFilter; bool num = interceptingSubscriptionEventArgs.ProcessSubscription && interceptingSubscriptionEventArgs.Response.ReasonCode <= MqttSubscribeReasonCode.GrantedQoS2; result.UserProperties = interceptingSubscriptionEventArgs.UserProperties; result.ReasonString = interceptingSubscriptionEventArgs.ReasonString; result.ReasonCodes.Add(interceptingSubscriptionEventArgs.Response.ReasonCode); if (interceptingSubscriptionEventArgs.CloseConnection) { result.CloseConnection = true; } if (num && !string.IsNullOrEmpty(topicFilter.Topic)) { CreateSubscriptionResult createSubscriptionResult = CreateSubscription(topicFilter, subscribePacket.SubscriptionIdentifier, interceptingSubscriptionEventArgs.Response.ReasonCode); addedSubscriptions.Add(topicFilter.Topic); finalTopicFilters.Add(topicFilter); FilterRetainedApplicationMessages(retainedApplicationMessages, createSubscriptionResult, result); } } _subscriptionChangedNotification?.OnSubscriptionsAdded(_session, addedSubscriptions); if (_eventContainer.ClientSubscribedTopicEvent.HasHandlers) { foreach (MqttTopicFilter item2 in finalTopicFilters) { ClientSubscribedTopicEventArgs eventArgs = new ClientSubscribedTopicEventArgs(_session.Id, item2, _session.Items); await _eventContainer.ClientSubscribedTopicEvent.InvokeAsync(eventArgs).ConfigureAwait(continueOnCapturedContext: false); } } return result; } public async Task<UnsubscribeResult> Unsubscribe(MqttUnsubscribePacket unsubscribePacket, CancellationToken cancellationToken) { if (unsubscribePacket == null) { throw new ArgumentNullException("unsubscribePacket"); } UnsubscribeResult result = new UnsubscribeResult(); List<string> removedSubscriptions = new List<string>(); _subscriptionsLock.EnterWriteLock(); try { foreach (string topicFilter in unsubscribePacket.TopicFilters) { _subscriptions.TryGetValue(topicFilter, out var existingSubscription); InterceptingUnsubscriptionEventArgs interceptingUnsubscriptionEventArgs = await InterceptUnsubscribe(topicFilter, existingSubscription, unsubscribePacket.UserProperties, cancellationToken).ConfigureAwait(continueOnCapturedContext: false); bool num = interceptingUnsubscriptionEventArgs.Response.ReasonCode == MqttUnsubscribeReasonCode.Success; result.UserProperties = interceptingUnsubscriptionEventArgs.UserProperties; result.ReasonCodes.Add(interceptingUnsubscriptionEventArgs.Response.ReasonCode); if (interceptingUnsubscriptionEventArgs.CloseConnection) { result.CloseConnection = true; } if (!num) { continue; } if (interceptingUnsubscriptionEventArgs.ProcessUnsubscription) { _subscriptions.Remove(topicFilter); if (existingSubscription != null) { ulong topicHash = existingSubscription.TopicHash; HashSet<MqttSubscription> value2; if (existingSubscription.TopicHasWildcard) { if (_wildcardSubscriptionsByTopicHash.TryGetValue(topicHash, out var value)) { value.RemoveSubscription(existingSubscription); if (value.SubscriptionsByHashMask.Count == 0) { _wildcardSubscriptionsByTopicHash.Remove(topicHash); } } } else if (_noWildcardSubscriptionsByTopicHash.TryGetValue(topicHash, out value2)) { value2.Remove(existingSubscription); if (value2.Count == 0) { _noWildcardSubscriptionsByTopicHash.Remove(topicHash); } } } removedSubscriptions.Add(topicFilter); } existingSubscription = null; } } finally { _subscriptionsLock.ExitWriteLock(); _subscriptionChangedNotification?.OnSubscriptionsRemoved(_session, removedSubscriptions); } if (_eventContainer.ClientUnsubscribedTopicEvent.HasHandlers) { foreach (string topicFilter2 in unsubscribePacket.TopicFilters) { ClientUnsubscribedTopicEventArgs eventArgs = new ClientUnsubscribedTopicEventArgs(_session.Id, topicFilter2, _session.Items); await _eventContainer.ClientUnsubscribedTopicEvent.InvokeAsync(eventArgs).ConfigureAwait(continueOnCapturedContext: false); } } return result; } private CreateSubscriptionResult CreateSubscription(MqttTopicFilter topicFilter, uint subscriptionIdentifier, MqttSubscribeReasonCode reasonCode) { MqttSubscription mqttSubscription = new MqttSubscription(qualityOfServiceLevel: reasonCode switch { MqttSubscribeReasonCode.GrantedQoS0 => MqttQualityOfServiceLevel.AtMostOnce, MqttSubscribeReasonCode.GrantedQoS1 => MqttQualityOfServiceLevel.AtLeastOnce, MqttSubscribeReasonCode.GrantedQoS2 => MqttQualityOfServiceLevel.ExactlyOnce, _ => throw new InvalidOperationException(), }, topic: topicFilter.Topic, noLocal: topicFilter.NoLocal, retainHandling: topicFilter.RetainHandling, retainAsPublished: topicFilter.RetainAsPublished, identifier: subscriptionIdentifier); _subscriptionsLock.EnterWriteLock(); bool isNewSubscription; try { MqttTopicHash.Calculate(topicFilter.Topic, out var resultHash, out var _, out var resultHasWildcard); if (_subscriptions.TryGetValue(topicFilter.Topic, out var value)) { HashSet<MqttSubscription> value3; if (resultHasWildcard) { if (_wildcardSubscriptionsByTopicHash.TryGetValue(resultHash, out var value2)) { value2.RemoveSubscription(value); } } else if (_noWildcardSubscriptionsByTopicHash.TryGetValue(resultHash, out value3)) { value3.Remove(value); } } isNewSubscription = value == null; _subscriptions[topicFilter.Topic] = mqttSubscription; if (resultHasWildcard) { if (!_wildcardSubscriptionsByTopicHash.TryGetValue(resultHash, out var value4)) { value4 = new TopicHashMaskSubscriptions(); _wildcardSubscriptionsByTopicHash.Add(resultHash, value4); } value4.AddSubscription(mqttSubscription); } else { if (!_noWildcardSubscriptionsByTopicHash.TryGetValue(resultHash, out var value5)) { value5 = new HashSet<MqttSubscription>(); _noWildcardSubscriptionsByTopicHash.Add(resultHash, value5); } value5.Add(mqttSubscription); } } finally { _subscriptionsLock.ExitWriteLock(); } return new CreateSubscriptionResult { IsNewSubscription = isNewSubscription, Subscription = mqttSubscription }; } private static void FilterRetainedApplicationMessages(IList<MqttApplicationMessage> retainedMessages, CreateSubscriptionResult createSubscriptionResult, SubscribeResult subscribeResult) { if (createSubscriptionResult.Subscription.RetainHandling == MqttRetainHandling.DoNotSendOnSubscribe || (createSubscriptionResult.Subscription.RetainHandling == MqttRetainHandling.SendAtSubscribeIfNewSubscriptionOnly && !createSubscriptionResult.IsNewSubscription)) { return; } for (int num = retainedMessages.Count - 1; num >= 0; num--) { MqttApplicationMessage mqttApplicationMessage = retainedMessages[num]; if (mqttApplicationMessage != null && MqttTopicFilterComparer.Compare(mqttApplicationMessage.Topic, createSubscriptionResult.Subscription.Topic) == MqttTopicFilterCompareResult.IsMatch) { MqttRetainedMessageMatch mqttRetainedMessageMatch = new MqttRetainedMessageMatch(mqttApplicationMessage, createSubscriptionResult.Subscription.GrantedQualityOfServiceLevel); if (mqttRetainedMessageMatch.SubscriptionQualityOfServiceLevel > mqttRetainedMessageMatch.ApplicationMessage.QualityOfServiceLevel) { mqttRetainedMessageMatch.SubscriptionQualityOfServiceLevel = mqttRetainedMessageMatch.ApplicationMessage.QualityOfServiceLevel; } if (subscribeResult.RetainedMessages == null) { subscribeResult.RetainedMessages = new List<MqttRetainedMessageMatch>(); } subscribeResult.RetainedMessages.Add(mqttRetainedMessageMatch); retainedMessages[num] = null; } } } private async Task<InterceptingSubscriptionEventArgs> InterceptSubscribe(MqttTopicFilter topicFilter, List<MqttUserProperty> userProperties, CancellationToken cancellationToken) { InterceptingSubscriptionEventArgs eventArgs = new InterceptingSubscriptionEventArgs(cancellationToken, _session.Id, new MqttSessionStatus(_session), topicFilter, userProperties); if (topicFilter.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) { eventArgs.Response.ReasonCode = MqttSubscribeReasonCode.GrantedQoS0; } else if (topicFilter.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) { eventArgs.Response.ReasonCode = MqttSubscribeReasonCode.GrantedQoS1; } else if (topicFilter.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) { eventArgs.Response.ReasonCode = MqttSubscribeReasonCode.GrantedQoS2; } if (!topicFilter.Topic.StartsWith("$share/")) { await _eventContainer.InterceptingSubscriptionEvent.InvokeAsync(eventArgs).ConfigureAwait(continueOnCapturedContext: false); } else { eventArgs.Response.ReasonCode = MqttSubscribeReasonCode.SharedSubscriptionsNotSupported; } return eventArgs; } private async Task<InterceptingUnsubscriptionEventArgs> InterceptUnsubscribe(string topicFilter, MqttSubscription mqttSubscription, List<MqttUserProperty> userProperties, CancellationToken cancellationToken) { InterceptingUnsubscriptionEventArgs interceptingUnsubscriptionEventArgs = new InterceptingUnsubscriptionEventArgs(cancellationToken, _session.Id, _session.Items, topicFilter, userProperties); interceptingUnsubscriptionEventArgs.Response.ReasonCode = ((mqttSubscription == null) ? MqttUnsubscribeReasonCode.NoSubscriptionExisted : MqttUnsubscribeReasonCode.Success); InterceptingUnsubscriptionEventArgs clientUnsubscribingTopicEventArgs = interceptingUnsubscriptionEventArgs; await _eventContainer.InterceptingUnsubscriptionEvent.InvokeAsync(clientUnsubscribingTopicEventArgs).ConfigureAwait(continueOnCapturedContext: false); return clientUnsubscribingTopicEventArgs; } } public sealed class MqttRetainedMessagesManager { private readonly Dictionary<string, MqttApplicationMessage> _messages = new Dictionary<string, MqttApplicationMessage>(4096); private readonly AsyncLock _storageAccessLock = new AsyncLock(); private readonly MqttServerEventContainer _eventContainer; private readonly MqttNetSourceLogger _logger; public MqttRetainedMessagesManager(MqttServerEventContainer eventContainer, IMqttNetLogger logger) { _eventContainer = eventContainer ?? throw new ArgumentNullException("eventContainer"); if (logger == null) { throw new ArgumentNullException("logger"); } _logger = logger.WithSource("MqttRetainedMessagesManager"); } public async Task Start() { try { LoadingRetainedMessagesEventArgs eventArgs = new LoadingRetainedMessagesEventArgs(); await _eventContainer.LoadingRetainedMessagesEvent.InvokeAsync(eventArgs).ConfigureAwait(continueOnCapturedContext: false); lock (_messages) { _messages.Clear(); if (eventArgs.LoadedRetainedMessages == null) { return; } foreach (MqttApplicationMessage loadedRetainedMessage in eventArgs.LoadedRetainedMessages) { _messages[loadedRetainedMessage.Topic] = loadedRetainedMessage; } } } catch (Exception exception) { _logger.Error(exception, "Unhandled exception while loading retained messages."); } } public async Task UpdateMessage(string clientId, MqttApplicationMessage applicationMessage) { if (applicationMessage == null) { throw new ArgumentNullException("applicationMessage"); } try { List<MqttApplicationMessage> messagesForSave = null; bool flag = false; lock (_messages) { ArraySegment<byte> payloadSegment = applicationMessage.PayloadSegment; if (payloadSegment.Count <= 0) { flag = _messages.Remove(applicationMessage.Topic); _logger.Verbose("Client '{0}' cleared retained message for topic '{1}'.", clientId, applicationMessage.Topic); } else { if (!_messages.TryGetValue(applicationMessage.Topic, out var value)) { _messages[applicationMessage.Topic] = applicationMessage; flag = true; } else if (value.QualityOfServiceLevel != applicationMessage.QualityOfServiceLevel || !SequenceEqual(value.PayloadSegment, payloadSegment)) { _messages[applicationMessage.Topic] = applicationMessage; flag = true; } _logger.Verbose("Client '{0}' set retained message for topic '{1}'.", clientId, applicationMessage.Topic); } if (flag) { messagesForSave = new List<MqttApplicationMessage>(_messages.Values); } } if (flag) { using (await _storageAccessLock.EnterAsync().ConfigureAwait(continueOnCapturedContext: false)) { RetainedMessageChangedEventArgs eventArgs = new RetainedMessageChangedEventArgs(clientId, applicationMessage, messagesForSave); await _eventContainer.RetainedMessageChangedEvent.InvokeAsync(eventArgs).ConfigureAwait(continueOnCapturedContext: false); } } } catch (Exception exception) { _logger.Error(exception, "Unhandled exception while handling retained messages."); } } public Task<IList<MqttApplicationMessage>> GetMessages() { lock (_messages) { return Task.FromResult((IList<MqttApplicationMessage>)new List<MqttApplicationMessage>(_messages.Values)); } } public Task<MqttApplicationMessage> GetMessage(string topic) { lock (_messages) { if (_messages.TryGetValue(topic, out var value)) { return Task.FromResult(value); } } return Task.FromResult<MqttApplicationMessage>(null); } public async Task ClearMessages() { lock (_messages) { _messages.Clear(); } using (await _storageAccessLock.EnterAsync().ConfigureAwait(continueOnCapturedContext: false)) { await _eventContainer.RetainedMessagesClearedEvent.InvokeAsync(EventArgs.Empty).ConfigureAwait(continueOnCapturedContext: false); } } private static bool SequenceEqual(ArraySegment<byte> source, ArraySegment<byte> target) { if (source.Count == target.Count) { return source.SequenceEqual(target); } return false; } } public sealed class MqttServerEventContainer { public AsyncEvent<ApplicationMessageNotConsumedEventArgs> ApplicationMessageNotConsumedEvent { get; } = new AsyncEvent<ApplicationMessageNotConsumedEventArgs>(); public AsyncEvent<ClientAcknowledgedPublishPacketEventArgs> ClientAcknowledgedPublishPacketEvent { get; } = new AsyncEvent<ClientAcknowledgedPublishPacketEventArgs>(); public AsyncEvent<ClientConnectedEventArgs> ClientConnectedEvent { get; } = new AsyncEvent<ClientConnectedEventArgs>(); public AsyncEvent<ClientDisconnectedEventArgs> ClientDisconnectedEvent { get; } = new AsyncEvent<ClientDisconnectedEventArgs>(); public AsyncEvent<ClientSubscribedTopicEventArgs> ClientSubscribedTopicEvent { get; } = new AsyncEvent<ClientSubscribedTopicEventArgs>(); public AsyncEvent<ClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopicEvent { get; } = new AsyncEvent<ClientUnsubscribedTopicEventArgs>(); public AsyncEvent<InterceptingClientApplicationMessageEnqueueEventArgs> InterceptingClientEnqueueEvent { get; } = new AsyncEvent<InterceptingClientApplicationMessageEnqueueEventArgs>(); public AsyncEvent<ApplicationMessageEnqueuedEventArgs> ApplicationMessageEnqueuedOrDroppedEvent { get; } = new AsyncEvent<ApplicationMessageEnqueuedEventArgs>(); public AsyncEvent<QueueMessageOverwrittenEventArgs> QueuedApplicationMessageOverwrittenEvent { get; } = new AsyncEvent<QueueMessageOverwrittenEventArgs>(); public AsyncEvent<InterceptingPacketEventArgs> InterceptingInboundPacketEvent { get; } = new AsyncEvent<InterceptingPacketEventArgs>(); public AsyncEvent<InterceptingPacketEventArgs> InterceptingOutboundPacketEvent { get; } = new AsyncEvent<InterceptingPacketEventArgs>(); public AsyncEvent<InterceptingPublishEventArgs> InterceptingPublishEvent { get; } = new AsyncEvent<InterceptingPublishEventArgs>(); public AsyncEvent<InterceptingSubscriptionEventArgs> InterceptingSubscriptionEvent { get; } = new AsyncEvent<InterceptingSubscriptionEventArgs>(); public AsyncEvent<InterceptingUnsubscriptionEventArgs> InterceptingUnsubscriptionEvent { get; } = new AsyncEvent<InterceptingUnsubscriptionEventArgs>(); public AsyncEvent<LoadingRetainedMessagesEventArgs> LoadingRetainedMessagesEvent { get; } = new AsyncEvent<LoadingRetainedMessagesEventArgs>(); public AsyncEvent<EventArgs> PreparingSessionEvent { get; } = new AsyncEvent<EventArgs>(); public AsyncEvent<RetainedMessageChangedEventArgs> RetainedMessageChangedEvent { get; } = new AsyncEvent<RetainedMessageChangedEventArgs>(); public AsyncEvent<EventArgs> RetainedMessagesClearedEvent { get; } = new AsyncEvent<EventArgs>(); public AsyncEvent<SessionDeletedEventArgs> SessionDeletedEvent { get; } = new AsyncEvent<SessionDeletedEventArgs>(); public AsyncEvent<EventArgs> StartedEvent { get; } = new AsyncEvent<EventArgs>(); public AsyncEvent<EventArgs> StoppedEvent { get; } = new AsyncEvent<EventArgs>(); public AsyncEvent<ValidatingConnectionEventArgs> ValidatingConnectionEvent { get; } = new AsyncEvent<ValidatingConnectionEventArgs>(); } public sealed class MqttServerKeepAliveMonitor { private static readonly MqttServerClientDisconnectOptions DefaultDisconnectOptions = new MqttServerClientDisconnectOptions { ReasonCode = MqttDisconnectReasonCode.KeepAliveTimeout, UserProperties = null, ReasonString = null, ServerReference = null }; private readonly MqttNetSourceLogger _logger; private readonly MqttServerOptions _options; private readonly MqttClientSessionsManager _sessionsManager; public MqttServerKeepAliveMonitor(MqttServerOptions options, MqttClientSessionsManager sessionsManager, IMqttNetLogger logger) { _options = options ?? throw new ArgumentNullException("options"); _sessionsManager = sessionsManager ?? throw new ArgumentNullException("sessionsManager"); if (logger == null) { throw new ArgumentNullException("logger"); } _logger = logger.WithSource("MqttServerKeepAliveMonitor"); } public void Start(CancellationToken cancellationToken) { Task.Factory.StartNew(delegate { DoWork(cancellationToken); }, cancellationToken, TaskCreationOptions.LongRunning).RunInBackground(_logger); } private void DoWork(CancellationToken cancellationToken) { try { _logger.Info("Starting keep alive monitor"); while (!cancellationToken.IsCancellationRequested) { TryProcessClients(); Sleep(_options.KeepAliveOptions.MonitorInterval); } } catch (OperationCanceledException) { } catch (Exception exception) { _logger.Error(exception, "Unhandled exception while checking keep alive timeouts"); } finally { _logger.Verbose("Stopped checking keep alive timeout"); } } private static void Sleep(TimeSpan timeout) { try { Thread.Sleep(timeout); } catch (ThreadAbortException) { t