Connector Implementation

Not every messaging provider has a ready-made connector in the framework. When you need to integrate with a custom or niche provider — an internal notification gateway, a legacy SMTP server, a proprietary chat system — you build a connector. The framework provides ChannelConnectorBase, an abstract base class that implements the IChannelConnector contract and handles all cross-cutting concerns so you only write the provider-specific translation layer.

The design follows the Template Method pattern: the base class defines the skeleton of each operation (initialize, send, receive, check status) and calls into your overrides for the provider-specific steps. The base class handles:

  • State management — tracks the lifecycle and prevents operations when not ready

  • Capability guards — checks schema capabilities before delegating to your code

  • Message validation — validates messages against the schema before your send logic runs

  • Authentication — resolves credentials via the authentication manager

  • Error wrapping — catches exceptions from your code and wraps them in OperationResult<T>

  • Logging scopes — creates structured scopes for tracing

The result is that your override methods stay focused on one thing: translating between the framework's IMessage model and the provider's API.

Build custom connectors by extending ChannelConnectorBase. The base class provides state management, capability validation, authentication integration, message validation, logging scopes, and standardized error wrapping. You implement the provider-specific parts.

Minimum implementation

Four abstract methods must be overridden:

using Ratatosk;
using Microsoft.Extensions.Logging;

[ChannelSchema(typeof(MySchemaFactory))]
public class MyConnector : ChannelConnectorBase
{
    private HttpClient _httpClient;

    public MyConnector(
        IChannelSchema schema,
        ConnectionSettings? settings = null,
        ILogger? logger = null,
        IAuthenticationManager? authManager = null)
        : base(schema, settings, logger, authManager) { }

    // ── 1. Initialize ─────────────────────────────────────────
    // Validate settings, create provider client, authenticate.
    // Called by InitializeAsync().
    protected override ValueTask InitializeConnectorAsync(CancellationToken ct)
    {
        var apiKey = ConnectionSettings.GetParameter("ApiKey");
        if (string.IsNullOrEmpty(apiKey))
            throw new InvalidOperationException("ApiKey is required");

        _httpClient = new HttpClient();
        _httpClient.DefaultRequestHeaders.Add("Authorization", $"Bearer {apiKey}");

        SetState(ConnectorState.Ready);
        return ValueTask.CompletedTask;
    }

    // ── 2. Test connection ────────────────────────────────────
    // Lightweight ping to verify the provider is reachable.
    // Called by TestConnectionAsync().
    protected override ValueTask TestConnectorConnectionAsync(CancellationToken ct)
    {
        // Throw on failure — base class wraps the exception
        return ValueTask.CompletedTask;
    }

    // ── 3. Send message ──────────────────────────────────────
    // Translate IMessage to the provider API and send.
    // Called by SendMessageAsync().
    protected override async Task<SendResult> SendMessageCoreAsync(
        IMessage message, CancellationToken ct)
    {
        var payload = new
        {
            to = message.Receiver?.Address,
            from = message.Sender?.Address,
            text = (message.Content as TextContent)?.Text
        };

        var response = await _httpClient.PostAsJsonAsync("/api/send", payload, ct);
        response.EnsureSuccessStatusCode();

        var result = await response.Content.ReadFromJsonAsync<ApiResponse>(ct);

        return new SendResult
        {
            MessageId = message.Id,
            RemoteMessageId = result!.Id,
            Status = MessageStatus.Sent,
            Timestamp = DateTimeOffset.UtcNow
        };
    }

    // ── 4. Get status ────────────────────────────────────────
    // Return the current connector status.
    // Called by GetStatusAsync().
    protected override async Task<StatusInfo> GetConnectorStatusAsync(
        CancellationToken ct)
    {
        try
        {
            var response = await _httpClient.GetAsync("/api/health", ct);
            return new StatusInfo(
                response.IsSuccessStatusCode ? "connected" : "degraded",
                null,
                DateTimeOffset.UtcNow);
        }
        catch
        {
            return new StatusInfo("disconnected", "Provider unreachable",
                DateTimeOffset.UtcNow);
        }
    }
}

What the base class does

Concern
Provided by base class

State management

Tracks UninitializedInitializingReady → ...

Capability guards

Throws NotSupportedException if capability not set

Message validation

Calls ValidateMessage before send

Error wrapping

Catches exceptions, wraps in OperationResult<T>

Authentication

Provides AuthenticateAsync(), GetAuthenticationHeader()

Logging scopes

Auto-creates scopes per connector and per message

Cancellation

Passes token to all operations

Retry support

Override GetDefaultRetryPolicy(); configurable via builder or connection settings (see Retry Policies)

Result wrapping

Your core methods return raw values; base class wraps them

How wrapping works

Your override returns a raw SendResult or ValueTask. The base class:

  1. Validates connector state (Ready)

  2. Validates capability (e.g., SendMessages)

  3. Validates message against schema

  4. Calls your override

  5. Catches any exception

  6. Wraps the result (or error) in OperationResult<T>

This means your override can throw on error — you never need to create OperationResult<T> instances yourself.

Retry policy

Connectors can provide a default retry policy by overriding GetDefaultRetryPolicy():

The policy configured via WithRetryPolicy or individual RetrySettingsKeys.* parameters in ConnectionSettings takes precedence. See Retry Policies for details.

Optional overrides

Override only what your provider supports:

State management

A connector moves through a well-defined lifecycle: it starts uninitialized, initializes (authenticates and sets up resources), becomes ready for operations, and eventually shuts down. Errors can transition it to an error state. The base class enforces this lifecycle automatically — operations that require the Ready state return a failure result if called before initialization or after shutdown.

The base class tracks the connector's lifecycle state:

States and transitions:

The Disconnected state represents a temporary loss of connectivity (transient, may recover). Error indicates an unrecoverable failure. Shutdown is terminal.

The base class prevents operations when not in Ready state. Calling SendMessageAsync on a connector that hasn't been initialized returns a failure result with INVALID_STATE error code.

Authentication integration

The base class handles authentication automatically. During InitializeAsync(), before InitializeConnectorAsync() is called, the base class iterates through the schema's AuthenticationConfigurations, selects the first one that satisfies the provided ConnectionSettings (via IsSatisfiedBy), and calls IAuthenticationManager.AuthenticateAsync() to obtain a credential. The credential is then available in InitializeConnectorAsync() through the AuthenticationCredential property:

When auto-authentication fails

If no auth configuration matches the connection settings, the base class logs a warning but does not prevent initialization. Your connector can handle this case:

Schema auth configuration

Your connector's schema must declare what authentication it supports:

When using AddAuthenticationConfiguration(), fields with AuthenticationRole = "principal" are automatically registered as optional schema parameters.

See Authentication for the full authentication model.

Error handling

Connector code inevitably deals with provider errors: HTTP 401, rate limiting, timeouts, malformed responses. The traditional approach is to catch every exception and convert it to a result type, which clutters the connector logic with error-handling boilerplate. ChannelConnectorBase takes a different approach: your core methods throw exceptions for error conditions, and the base class catches them and converts them into OperationResult<T>.Fail() automatically. This keeps the send/receive logic focused on the happy path.

Throw exceptions from your core methods — the base class catches them and converts them to OperationResult<T>.Fail():

Error conventions

Use SCREAMING_SNAKE_CASE error codes for consistency:

  • INVALID_CREDENTIALS — authentication failed

  • RATE_LIMITED — provider rate limit hit

  • NETWORK_ERROR — connection timed out or refused

  • PROVIDER_VALIDATION_FAILED — provider rejected the message format

  • MESSAGE_TOO_LARGE — content exceeds provider limits

Logging

The base class creates structured logging scopes:

DI registration

The factory pattern

Connectors can also be created via IChannelConnectorFactory<TConnector>:

The default factory uses ActivatorUtilities.CreateInstance — it resolves constructor parameters from DI if possible, falling back to the provided values.

Full example: complete connector

Last updated