Skip to content

Control Plane Client

The internal/api package provides the Go client for communicating with the Plexsphere control plane. It handles HTTPS request/response calls, SSE event streaming, automatic reconnection with exponential backoff, and event dispatching.

Config

Config holds connection parameters passed to the client constructor. No file I/O occurs in this package — config loading is the caller's responsibility.

FieldTypeDefaultDescription
BaseURLstringControl plane API base URL (required)
TLSInsecureSkipVerifyboolfalseDisable TLS certificate verification
ConnectTimeouttime.Duration10sTCP connection timeout
RequestTimeouttime.Duration30sFull HTTP request/response timeout
SSEIdleTimeouttime.Duration90sMax idle time before SSE reconnect
go
cfg := api.Config{
    BaseURL:               "https://api.plexsphere.com",
    TLSInsecureSkipVerify: false,
}
cfg.ApplyDefaults() // sets zero-valued timeouts to defaults
if err := cfg.Validate(); err != nil {
    log.Fatal(err)
}

ControlPlane

ControlPlane is the core HTTP client. It manages authentication, JSON serialization, gzip compression, and error mapping.

Constructor

go
func NewControlPlane(cfg Config, version string, logger *slog.Logger) (*ControlPlane, error)
  • Applies config defaults and validates
  • Configures TLS, connect timeout, request timeout
  • Sets User-Agent: plexd/{version} on all requests
  • Gzip-compresses request bodies larger than 1 KiB
  • Transparently decompresses gzip responses

Authentication

go
client.SetAuthToken("node-identity-token")

Thread-safe via sync.RWMutex. The token is injected as Authorization: Bearer {token} on every request. Call SetAuthToken after registration to switch from bootstrap token to node identity token.

API Methods

All methods accept a context.Context for cancellation and return typed responses.

MethodHTTPPathRequest TypeResponse Type
RegisterPOST/v1/registerRegisterRequest*RegisterResponse
HeartbeatPOST/v1/nodes/{node_id}/heartbeatHeartbeatRequest*HeartbeatResponse
DeregisterPOST/v1/nodes/{node_id}/deregister
FetchStateGET/v1/nodes/{node_id}/state*StateResponse
ConnectSSEGET/v1/nodes/{node_id}/events*http.Response
RotateKeysPOST/v1/keys/rotateKeyRotateRequest*KeyRotateResponse
UpdateCapabilitiesPUT/v1/nodes/{node_id}/capabilitiesCapabilitiesPayload
ReportEndpointPUT/v1/nodes/{node_id}/endpointEndpointReport*EndpointResponse
ReportDriftPOST/v1/nodes/{node_id}/driftDriftReport
FetchSecretGET/v1/nodes/{node_id}/secrets/{key}*SecretResponse
SyncReportsPOST/v1/nodes/{node_id}/reportReportSyncRequest
AckExecutionPOST/v1/nodes/{node_id}/executions/{id}/ackExecutionAck
ReportResultPOST/v1/nodes/{node_id}/executions/{id}/resultExecutionResult
ReportMetricsPOST/v1/nodes/{node_id}/metricsMetricBatch
ReportLogsPOST/v1/nodes/{node_id}/logsLogBatch
ReportAuditPOST/v1/nodes/{node_id}/auditAuditBatch
FetchArtifactGET/v1/artifacts/plexd/{version}/{os}/{arch}io.ReadCloser
TunnelReadyPOST/v1/nodes/{node_id}/tunnels/{session_id}/readyTunnelReadyRequest
TunnelClosedPOST/v1/nodes/{node_id}/tunnels/{session_id}/closedTunnelClosedRequest
ReportIntegrityViolationPOST/v1/nodes/{node_id}/integrity/violationsIntegrityViolationReport

Generic Helpers

go
func (c *ControlPlane) Ping(ctx context.Context) error
func (c *ControlPlane) PostJSON(ctx context.Context, path string, body any, result any) error
func (c *ControlPlane) GetJSON(ctx context.Context, path string, result any) error

Error Types

HTTP errors are mapped to structured *APIError values supporting errors.Is and errors.As.

SentinelStatusDescription
ErrBadRequest400Invalid request
ErrUnauthorized401Authentication failure
ErrForbidden403Access denied (permanent)
ErrNotFound404Resource not found (permanent)
ErrConflict409Conflict
ErrPayloadTooLarge413Request body too large
ErrRateLimit429Rate limited (has RetryAfter)
ErrServer5xxServer error (matches any 5xx)
go
resp, err := client.FetchState(ctx, nodeID)
if errors.Is(err, api.ErrUnauthorized) {
    // re-authenticate
} else if errors.Is(err, api.ErrRateLimit) {
    var apiErr *api.APIError
    errors.As(err, &apiErr)
    time.Sleep(apiErr.RetryAfter)
}

SSEManager

SSEManager is the top-level orchestrator that wires together SSE streaming, reconnection, verification, and event dispatching.

Lifecycle

go
logger := slog.Default()
mgr := api.NewSSEManager(client, nil, logger) // nil verifier = NoOpVerifier

// Register handlers before Start
mgr.RegisterHandler("peer_added", func(ctx context.Context, env api.SignedEnvelope) error {
    // handle peer addition
    return nil
})
mgr.RegisterHandler("policy_updated", func(ctx context.Context, env api.SignedEnvelope) error {
    // handle policy change
    return nil
})

// Start blocks until context cancelled, Shutdown called, or permanent error
ctx, cancel := context.WithCancel(context.Background())
go func() {
    if err := mgr.Start(ctx, nodeID); err != nil {
        log.Printf("SSE manager stopped: %v", err)
    }
}()

// Later: graceful shutdown
mgr.Shutdown()

Methods

MethodDescription
NewSSEManagerCreates manager with client, optional verifier, logger
RegisterHandlerRegisters an event handler by type (call before Start)
Start(ctx, nodeID)Blocking SSE loop with automatic reconnection
Shutdown()Cancels internal context, causes Start to return
SetPollFunc(fn)Overrides the default polling function (FetchState)
SetReconnectIntervalsConfigures backoff base and max intervals
SetPollingFallbackConfigures polling fallback threshold and interval

EventVerifier

Pluggable interface for verifying signed event envelopes. The default NoOpVerifier accepts all events. A concrete Ed25519 implementation will be provided by feature S010.

go
type EventVerifier interface {
    Verify(ctx context.Context, envelope SignedEnvelope) error
}

EventDispatcher

Routes verified events to registered handlers by event_type.

  • Multiple handlers per event type (invoked sequentially in registration order)
  • Handler errors are logged but do not block subsequent handlers
  • Unhandled event types are logged at debug level and discarded
  • Thread-safe handler registration via sync.RWMutex

Event Type Constants

All 21 SSE event types from the control plane:

ConstantValue
EventPeerAddedpeer_added
EventPeerRemovedpeer_removed
EventPeerKeyRotatedpeer_key_rotated
EventPeerEndpointChangedpeer_endpoint_changed
EventPolicyUpdatedpolicy_updated
EventActionRequestaction_request
EventSessionRevokedsession_revoked
EventSSHSessionSetupssh_session_setup
EventRotateKeysrotate_keys
EventSigningKeyRotatedsigning_key_rotated
EventNodeStateUpdatednode_state_updated
EventNodeSecretsUpdatednode_secrets_updated
EventBridgeConfigUpdatedbridge_config_updated
EventRelaySessionAssignedrelay_session_assigned
EventRelaySessionRevokedrelay_session_revoked
EventUserAccessConfigUpdateduser_access_config_updated
EventUserAccessPeerAssigneduser_access_peer_assigned
EventUserAccessPeerRevokeduser_access_peer_revoked
EventIngressConfigUpdatedingress_config_updated
EventIngressRuleAssignedingress_rule_assigned
EventIngressRuleRevokedingress_rule_revoked
EventSiteToSiteConfigUpdatedsite_to_site_config_updated
EventSiteToSiteTunnelAssignedsite_to_site_tunnel_assigned
EventSiteToSiteTunnelRevokedsite_to_site_tunnel_revoked

ReconnectEngine

Manages SSE reconnection with exponential backoff and polling fallback.

Backoff Parameters

ParameterDefaultDescription
Base interval1sInitial backoff delay
Multiplier2xExponential growth factor
Max interval60sBackoff cap
Jitter±25%Random variation on each delay
Polling fallback5 minTime before switching to polling
Poll interval60sHow often to poll during fallback

Failure Classification

Error TypeAction
Network / 5xxRetryTransient — exponential backoff
401 UnauthorizedRetryAuth — invoke callback, stop
429 Rate LimitedRespectServer — use Retry-After header
403 / 404PermanentFailure — stop reconnection

State Machine

SSE Parser

W3C-compliant text/event-stream line protocol parser.

  • Handles event:, data:, id:, retry: fields
  • Multi-line data: fields concatenated with \n
  • Comment lines (: prefix) ignored (used as keepalives)
  • Tracks Last-Event-ID for reconnection replay
  • retry: field updates reconnection interval via callback

SSE Stream

SSEStream wraps the parser with HTTP connectivity, envelope parsing, verification, and dispatching.

  • Connects via ControlPlane.ConnectSSE with Accept: text/event-stream
  • Sends Last-Event-ID header on reconnection
  • Parses each data: payload as a SignedEnvelope
  • Passes envelope through EventVerifier before dispatching
  • Malformed events are logged and skipped without disconnecting