class Matter::Protocol::MessageHandler

Overview

Protocol message handler that routes incoming messages to appropriate handlers

Handles:

Defined in:

matter/protocol/message_handler.cr

Constant Summary

DEFAULT_MAX_SESSIONS = 32_u16

Session management defaults

DEFAULT_SESSION_CLEANUP_INTERVAL = 5.seconds
DEFAULT_SUBSCRIPTION_GRACE_PERIOD = 30.seconds
DEFAULT_TRANSPORT_RETRY_WINDOW = 4.seconds
Log = ::Log.for("matter.protocol")
MSG_CASE_SIGMA1 = 48_u8
MSG_CASE_SIGMA2 = 49_u8
MSG_CASE_SIGMA3 = 50_u8
MSG_PASE_PAKE1 = 34_u8
MSG_PASE_PAKE2 = 35_u8
MSG_PASE_PAKE3 = 36_u8
MSG_PBKDF_PARAM_REQUEST = 32_u8
MSG_PBKDF_PARAM_RESPONSE = 33_u8
MSG_STANDALONE_ACK = 16_u8

Secure Channel Message Types

MSG_STATUS_REPORT = 64_u8
PROTOCOL_BDX = 2_u16
PROTOCOL_INTERACTION_MODEL = 1_u16
PROTOCOL_SECURE_CHANNEL = 0_u16

Protocol IDs

PROTOCOL_USER_DIRECTED_COMM = 3_u16

Constructors

Instance Method Summary

Constructor Detail

def self.new(transport : Transport::UDPTransport, setup_pin : UInt32, discriminator : UInt16, fabric_table : FabricTable, iterations : UInt32 = 1000_u32, salt : Bytes = Random::Secure.random_bytes(32), vendor_id : UInt16 = 65521_u16, product_id : UInt16 = 32769_u16, max_sessions : UInt16 = DEFAULT_MAX_SESSIONS, subscription_grace_period : Time::Span = DEFAULT_SUBSCRIPTION_GRACE_PERIOD, transport_retry_window : Time::Span = DEFAULT_TRANSPORT_RETRY_WINDOW) #

[View source]

Instance Method Detail

def active_subscriptions : Hash(UInt32, Matter::Protocol::MessageHandler::ActiveSubscription) #

Public getter for active subscriptions (for persistence)


[View source]
def cancel_cleanup_on_traffic(session_id : UInt16) : Bool #

Cancel pending cleanup for a session if traffic is detected Called when we receive a message on a session that has cancel_on_traffic=true


[View source]
def case_fabric : Fabric | Nil #

[View source]
def case_fabric=(case_fabric : Fabric | Nil) #

[View source]
def case_initiator_session_id : UInt16 | Nil #

[View source]
def case_initiator_session_id=(case_initiator_session_id : UInt16 | Nil) #

[View source]
def case_responder : Session::Case::CaseResponder | Nil #

CASE support


[View source]
def case_responder=(case_responder : Session::Case::CaseResponder | Nil) #

CASE support


[View source]
def case_responder_session_id : UInt16 | Nil #

[View source]
def case_responder_session_id=(case_responder_session_id : UInt16 | Nil) #

[View source]
def clusters : Hash(Tuple(UInt16, UInt32), Cluster::Base) #

[View source]
def discriminator : UInt16 #

[View source]
def discriminator=(discriminator : UInt16) #

[View source]
def fabric_table : FabricTable #

[View source]
def find_matching_subscription(session_id : UInt16, paths : Array(InteractionModel::AttributePath)) : ActiveSubscription | Nil #

Find existing subscription that matches a new subscription request (same session, overlapping paths)


[View source]
def handle_message(msg : Codec::MessageCodec::Message, peer : Socket::IPAddress) : Nil #

Main message routing entry point


[View source]
def initiator_session_id : UInt16 | Nil #

PASE session IDs from PBKDF exchange


[View source]
def initiator_session_id=(initiator_session_id : UInt16 | Nil) #

PASE session IDs from PBKDF exchange


[View source]
def iterations : UInt32 #

[View source]
def iterations=(iterations : UInt32) #

[View source]
def mark_case_resumption_failed(session_id : UInt16) : Nil #

Mark a session for cleanup due to CASE resumption failure Called when CASE resumption is attempted but fails


[View source]
def mark_transport_failure(session_id : UInt16) : Nil #

Mark a session as having transport failure and schedule cleanup Called when transport reports the peer is unreachable after retries


[View source]
def max_sessions : UInt16 #

Session management configuration


[View source]
def max_sessions=(max_sessions : UInt16) #

Session management configuration


[View source]
def next_subscription_id : UInt32 #

Subscription support


[View source]
def next_subscription_id=(next_subscription_id : UInt32) #

Subscription support


[View source]
def notify_subscriptions(endpoint_id : UInt16, cluster_id : UInt32, attribute_id : UInt32) #

Handle attribute change and send updates to matching subscriptions This is called by clusters when their attributes change


[View source]
def on_commissioned : Proc(Fabric, Nil) | Nil #

Commissioning callback - called when a fabric is successfully added (AddNOC complete) The device should use this to switch from commissioning to operational mDNS advertisement


[View source]
def on_commissioned=(on_commissioned : Proc(Fabric, Nil) | Nil) #

Commissioning callback - called when a fabric is successfully added (AddNOC complete) The device should use this to switch from commissioning to operational mDNS advertisement


[View source]
def on_get_fabric : Proc(Fabric | Nil) | Nil #

Fabric access callback - set by the device implementation This allows the message handler to access fabric data for CASE


[View source]
def on_get_fabric=(on_get_fabric : Proc(Fabric | Nil) | Nil) #

Fabric access callback - set by the device implementation This allows the message handler to access fabric data for CASE


[View source]
def on_session_established : Proc(Session::SecureContext, Nil) | Nil #

Session established callback - called when a new secure session is established (CASE or PASE) The device can use this to persist sessions for reconnection after restart


[View source]
def on_session_established=(on_session_established : Proc(Session::SecureContext, Nil) | Nil) #

Session established callback - called when a new secure session is established (CASE or PASE) The device can use this to persist sessions for reconnection after restart


[View source]
def on_session_removed : Proc(UInt16, Nil) | Nil #

Callback fired when a superseded session is cleaned up Device can use this to remove session from persistent storage


[View source]
def on_session_removed=(on_session_removed : Proc(UInt16, Nil) | Nil) #

Callback fired when a superseded session is cleaned up Device can use this to remove session from persistent storage


[View source]
def on_subscription_established : Proc(ActiveSubscription, Nil) | Nil #

Subscription established callback - called when a new subscription becomes active The device can use this to persist subscriptions for reconnection after restart


[View source]
def on_subscription_established=(on_subscription_established : Proc(ActiveSubscription, Nil) | Nil) #

Subscription established callback - called when a new subscription becomes active The device can use this to persist subscriptions for reconnection after restart


[View source]
def on_subscription_removed : Proc(UInt32, Nil) | Nil #

Callback fired when a subscription is removed (expired or renewed)


[View source]
def on_subscription_removed=(on_subscription_removed : Proc(UInt32, Nil) | Nil) #

Callback fired when a subscription is removed (expired or renewed)


[View source]
def operational_credentials_cluster : Cluster::OperationalCredentialsCluster | Nil #

[View source]
def pase_responder : Session::Pase::PaseResponder | Nil #

[View source]
def pbkdf_request_payload : Bytes | Nil #

PASE context: store request/response payloads for context hashing


[View source]
def pbkdf_request_payload=(pbkdf_request_payload : Bytes | Nil) #

PASE context: store request/response payloads for context hashing


[View source]
def pbkdf_response_payload : Bytes | Nil #

[View source]
def pbkdf_response_payload=(pbkdf_response_payload : Bytes | Nil) #

[View source]
def process_expired_subscriptions : Int32 #

Process expired subscriptions and schedule session cleanup if needed


[View source]
def process_pending_cleanups : Int32 #

Manually trigger cleanup of expired pending sessions (useful for testing)


[View source]
def product_id : UInt16 #

[View source]
def product_id=(product_id : UInt16) #

[View source]
def renew_subscription(old_subscription_id : UInt32, new_subscription : ActiveSubscription) : Nil #

Handle subscription renewal - called when a new SubscribeRequest comes in for the same attribute paths from the same session


[View source]
def responder_session_id : UInt16 | Nil #

[View source]
def responder_session_id=(responder_session_id : UInt16 | Nil) #

[View source]
def salt : Bytes #

[View source]
def salt=(salt : Bytes) #

[View source]
def sessions : Hash(UInt16, Session::SecureContext) #

[View source]
def setup_cluster_notifications #

Wire up attribute change notification callbacks for all clusters This should be called after all clusters have been added to the clusters hash It enables automatic subscription updates when attributes change


[View source]
def setup_pin : UInt32 #

Device credentials for PASE


[View source]
def setup_pin=(setup_pin : UInt32) #

Device credentials for PASE


[View source]
def subscription_grace_period : Time::Span #

[View source]
def subscription_grace_period=(subscription_grace_period : Time::Span) #

[View source]
def transport : Transport::UDPTransport #

[View source]
def transport_retry_window : Time::Span #

[View source]
def transport_retry_window=(transport_retry_window : Time::Span) #

[View source]
def vendor_id : UInt16 #

[View source]
def vendor_id=(vendor_id : UInt16) #

[View source]