Skip to content

Log Forwarding

The internal/logfwd package forwards system and application logs from plexd mesh nodes to the control plane via POST /v1/nodes/{node_id}/logs. All log sources are abstracted behind injectable interfaces for testability.

The Forwarder runs two independent ticker loops in a single goroutine: one for collection and one for reporting. Collected log entries are buffered in memory and flushed to the control plane at the configured report interval.

Log Format and Delivery

Logs are collected from the configured sources and delivered to the control plane as batch POST requests using JSON Lines format, gzip-compressed. Batches are flushed at report_interval (default 30s) or when batch_size (default 200 entries) is reached.

Each log line is serialized as:

json
{
  "timestamp": "2025-01-15T10:30:00.123Z",
  "source": "journald",
  "unit": "plexd",
  "message": "reconciliation completed, 0 drifts corrected",
  "severity": "info",
  "hostname": "web-01"
}
  • Filtering: Severity-level filters (min_severity), unit inclusion lists (include_units), and unit exclusion lists (exclude_units) are applied before batching.
  • File patterns: Glob patterns in file_patterns specify additional log files to monitor beyond journald.
  • Offline buffering: When the control plane is unreachable, log entries are buffered internally. Buffered entries are drained on reconnection.

Config

Config holds log forwarding parameters.

FieldTypeDefaultDescription
EnabledbooltrueWhether log forwarding is active
CollectIntervaltime.Duration10sInterval between collection cycles (min 5s)
ReportIntervaltime.Duration30sInterval between reporting to control plane
BatchSizeint200Maximum log entries per report batch (min 1)
FilePatterns[]stringnilGlob patterns for file-based log collection
FilterFilterConfig(empty)Log filtering rules (see LogFilter section)
LocalEndpointapi.LocalEndpointConfig(zero)Optional local endpoint for dual-destination delivery (see below)
go
cfg := logfwd.Config{}
cfg.ApplyDefaults() // Enabled=true, CollectInterval=10s, ReportInterval=30s, BatchSize=200
if err := cfg.Validate(); err != nil {
    log.Fatal(err)
}

ApplyDefaults sets Enabled=true on a zero-valued Config. To disable log forwarding, set Enabled=false after calling ApplyDefaults.

Validation Rules

FieldRuleError Message
CollectInterval>= 5slogfwd: config: CollectInterval must be at least 5s
ReportInterval>= CollectIntervallogfwd: config: ReportInterval must be >= CollectInterval
BatchSize>= 1logfwd: config: BatchSize must be at least 1

When Enabled=false, validation is skipped entirely (including LocalEndpoint validation).

Local Endpoint

For a step-by-step setup guide, see Setting Up Local Endpoint Delivery.

LocalEndpoint allows logs to be sent to an additional local endpoint alongside the control plane. The type is api.LocalEndpointConfig, defined once in internal/api/types.go and shared across all three observability pipelines.

FieldTypeYAML KeyDescription
URLstringlocal_endpoint.urlHTTPS endpoint URL. Empty means not configured.
SecretKeystringlocal_endpoint.secret_keyAuth credential. Required when URL is set. Redacted in config dumps.
TLSInsecureSkipVerifyboollocal_endpoint.tls_insecure_skip_verifyDisable TLS certificate verification.

Validation rules (applied only when Enabled=true and URL is non-empty):

RuleError Message
URL must be parseablelogfwd: config: local_endpoint: invalid URL "<url>"
Scheme must be httpslogfwd: config: local_endpoint: URL must be HTTPS, got "<scheme>"
SecretKey must be non-emptylogfwd: config: local_endpoint: SecretKey is required when URL is set

A zero-valued LocalEndpointConfig (all fields empty/false) is valid and means "not configured".

yaml
log_fwd:
  enabled: true
  collect_interval: 10s
  report_interval: 30s
  batch_size: 200
  local_endpoint:
    url: https://logs.local:9090/ingest
    secret_key: local-logs-token
    tls_insecure_skip_verify: false

LogSource

Interface for subsystem-specific log collection. Each source returns a slice of api.LogEntry.

go
type LogSource interface {
    Collect(ctx context.Context) ([]api.LogEntry, error)
}

LogReporter

Interface abstracting the control plane log reporting API. Satisfied by api.ControlPlane.

go
type LogReporter interface {
    ReportLogs(ctx context.Context, nodeID string, batch api.LogBatch) error
}

LocalReporter

LocalReporter implements LogReporter by POSTing log batches to a locally-configured HTTPS endpoint with bearer-token authentication. It operates independently from the control plane client—with its own http.Client, TLS settings, and credential cache.

Constructor

go
func NewLocalReporter(cfg api.LocalEndpointConfig, fetcher SecretFetcher, nsk []byte, nodeID string, logger *slog.Logger) *LocalReporter
ParameterDescription
cfgLocal endpoint configuration (URL, secret key, TLS settings)
fetcherSecretFetcher for retrieving encrypted credentials (satisfied by api.ControlPlane)
nskNode secret key bytes for AES-256-GCM decryption via nodeapi.DecryptSecret
nodeIDNode identifier passed to SecretFetcher.FetchSecret
loggerStructured logger (log/slog)

SecretFetcher

go
type SecretFetcher interface {
    FetchSecret(ctx context.Context, nodeID, key string) (*api.SecretResponse, error)
}

Defined in the logfwd package. The api.ControlPlane client satisfies this interface.

HTTP Client

SettingValueNotes
Timeout10sPer-request timeout
TLSConfigurableTLSInsecureSkipVerify controls certificate validation for this client only
CompressionNoneBatches are sent as uncompressed JSON

Credential Resolution

Same flow as metrics.LocalReporter: check cache (5-minute TTL) → FetchSecretDecryptSecret → update cache. Falls back to stale cached token on fetch/decrypt failure. Protected by sync.RWMutex with double-checked locking.

ReportLogs Behavior

go
func (r *LocalReporter) ReportLogs(ctx context.Context, nodeID string, batch api.LogBatch) error
  1. Resolve bearer token
  2. JSON-marshal the LogBatch (identical JSON body to what the platform receives)
  3. POST to cfg.URL with Content-Type: application/json and Authorization: Bearer {token}
  4. Return nil on 2xx; return error containing the status code on non-2xx

MultiReporter

MultiReporter implements LogReporter by dispatching to both a platform and a local reporter concurrently.

Constructor

go
func NewMultiReporter(platform, local LogReporter, logger *slog.Logger) *MultiReporter

Error Semantics

Platform resultLocal resultReturn valueSide effect
successsuccessnil
errorsuccessplatform error
successerrornilLocal error logged as warning
errorerrorplatform errorLocal error logged as warning

Only the platform error is returned. The Forwarder uses the return value for retry/retain decisions—local failures must not trigger batch retention.

JournalReader

Interface abstracting systemd journal access for testability.

go
type JournalReader interface {
    ReadEntries(ctx context.Context) ([]JournalEntry, error)
}

JournalEntry

go
type JournalEntry struct {
    Timestamp time.Time
    Message   string
    Priority  int
    Unit      string
}

JournaldSource

Collects log entries from the systemd journal via an injectable JournalReader.

Constructor

go
func NewJournaldSource(reader JournalReader, hostname string, logger *slog.Logger) *JournaldSource
ParameterDescription
readerJournalReader implementation for reading entries
hostnameNode hostname included in every log entry
loggerStructured logger (log/slog)

Field Mapping

Journal FieldLogEntry FieldDescription
MESSAGEMessageLog message content
PRIORITYSeverityMapped from integer to severity string
_SYSTEMD_UNITUnitSystemd unit name (empty if absent)
__REALTIME_TIMESTAMPTimestampEntry timestamp
(constant)SourceAlways "journald"
(constructor)HostnameSet at construction time

Priority-to-Severity Mapping

PrioritySeverity
0emerg
1alert
2crit
3err
4warning
5notice
6info
7debug

Out-of-range priority values default to "info".

Collect Behavior

Returns one api.LogEntry per journal entry. On reader error, returns nil, fmt.Errorf("logfwd: journald: %w", err). Returns nil, nil when no entries are available.

JournalctlReader

Concrete JournalReader implementation for Linux that reads entries by running the journalctl subprocess with JSON output.

go
func NewJournalctlReader() *JournalctlReader

Build-tagged //go:build linux.

Cursor Tracking

On the first call, reads entries from the last 60 seconds (--since=60 seconds ago). Subsequent calls use --after-cursor=<cursor> to avoid re-reading entries. The cursor is extracted from the __CURSOR field of each JSON entry.

journalctl Invocation

journalctl --output=json --no-pager -n 1000 [--since=60 seconds ago | --after-cursor=<cursor>]

JSON Field Parsing

journalctl JSON FieldJournalEntry FieldParsing
MESSAGEMessageDirect string
_SYSTEMD_UNITUnitDirect string
PRIORITYPriorityString → int via strconv.Atoi
__REALTIME_TIMESTAMPTimestampMicroseconds since epoch → time.Time
__CURSOR(internal)Stored for next call

Malformed JSON lines are silently skipped. If timestamp parsing fails, time.Now() is used as fallback. Returns error if journalctl is not found (exec.ErrNotFound).

FileSource

Reads new lines from log files matching a glob pattern, with offset tracking and rotation detection.

Constructor

go
func NewFileSource(pattern, hostname string, logger *slog.Logger) *FileSource
ParameterDescription
patternGlob expression (e.g., "/var/log/app/*.log")
hostnameNode hostname included in every log entry
loggerStructured logger (log/slog)

Collect Behavior

  1. Expand glob pattern to matching files
  2. For each file, check inode and size against tracked state
  3. Rotation detection: if inode changed or file is smaller than stored offset, reset offset to 0
  4. Read new lines from the stored offset using bufio.Scanner
  5. Lines exceeding 16 KiB are truncated with [truncated] suffix
  6. Empty lines are skipped
  7. Each line produces an api.LogEntry with Source="file", Unit=<filepath>, Severity="info"
  8. Update stored offset after reading

Log Entry Fields

FieldValue
Timestamptime.Now() at read time
Source"file"
UnitFull file path
MessageLine content (truncated at 16 KiB)
Severity"info" (always)
HostnameSet at construction time

Error Handling

  • Glob errors return an error
  • Individual file read failures are logged at warn level; other files continue
  • Partial results are returned on context cancellation

LogFilter

The FilteringSource wraps a LogSource and applies filter rules to its output. The Forwarder automatically wraps all sources with FilteringSource when Config.Filter is non-empty.

FilterConfig

go
type FilterConfig struct {
    MinSeverity  string    // Drop entries below this severity level (empty = no filter)
    IncludeUnits []string  // Only pass entries matching these unit names (empty = all)
    ExcludeUnits []string  // Drop entries matching any of these unit names
}

Configured via Config.Filter:

go
cfg := logfwd.Config{
    Filter: logfwd.FilterConfig{
        MinSeverity:  "warning",
        IncludeUnits: []string{"plexd.service", "sshd.service"},
        ExcludeUnits: []string{"systemd-resolved.service"},
    },
}

Severity Filtering

Uses syslog priority ordering (lower number = more severe):

PrioritySeverityPasses MinSeverity="warning"?
0emergyes
1alertyes
2crityes
3erryes
4warningyes (threshold)
5noticeno
6infono
7debugno

Unknown severity values default to "info" priority (6).

Filter Evaluation Order

  1. Severity filter: if MinSeverity is set and entry severity is less severe, drop
  2. Include filter: if IncludeUnits is non-empty and entry unit is not in the list, drop
  3. Exclude filter: if entry unit matches any ExcludeUnits, drop
  4. Entry passes all filters → included in output

Constructor

go
func NewFilteringSource(inner LogSource, config FilterConfig) *FilteringSource

When FilterConfig.IsEmpty() returns true (all fields are zero/empty), the filter is a no-op passthrough.

Forwarder

Orchestrates log collection and reporting via two independent ticker loops.

Constructor

go
func NewForwarder(cfg Config, sources []LogSource, reporter LogReporter, nodeID string, hostname string, logger *slog.Logger) *Forwarder
ParameterDescription
cfgLog forwarding configuration
sourcesSlice of LogSource implementations to run each cycle
reporterLogReporter for sending batches to control plane
nodeIDNode identifier included in report requests
hostnameNode hostname (passed to sources at construction)
loggerStructured logger (log/slog)

RegisterSource

go
func (f *Forwarder) RegisterSource(s LogSource)

Adds a log source after construction. Must be called before Run; not safe for concurrent use.

Run Method

go
func (f *Forwarder) Run(ctx context.Context) error

Blocks until the context is cancelled. Returns ctx.Err() on cancellation.

Lifecycle

go
journalSrc := logfwd.NewJournaldSource(journalReader, hostname, logger)

fwd := logfwd.NewForwarder(cfg, []logfwd.LogSource{journalSrc}, controlPlane, nodeID, hostname, logger)

// Blocks until ctx is cancelled
err := fwd.Run(ctx)
// err == context.Canceled (normal shutdown)

Run Sequence

  1. If Enabled=false: log info, return nil immediately
  2. Run an immediate first collection cycle
  3. Start collect ticker (CollectInterval) and report ticker (ReportInterval)
  4. On collect tick: call each source's Collect with panic recovery, append results to mutex-protected buffer, log errors per-source but continue
  5. On report tick: swap buffer under lock, send via ReportLogs in chunks of BatchSize, log errors but continue
  6. On context cancellation: best-effort flush of remaining buffer using context.Background(), return ctx.Err()

Buffer Management

  • Collected LogEntry values are appended to an internal buffer protected by sync.Mutex
  • Buffer capacity is bounded at 2 * BatchSize entries
  • When the buffer exceeds capacity, the oldest entries are dropped and a warning is logged with the count of dropped entries
  • On report tick, the buffer is swapped out atomically (lock, copy reference, set to nil, unlock)
  • Empty buffers skip the report call entirely
  • Large batches are split into multiple API calls of at most BatchSize entries each
  • On reporter error, unsent entries are retained in the buffer for the next report cycle
  • On shutdown, remaining buffered entries are flushed with a background context

API Contract

POST /v1/nodes/{node_id}/logs

Reports a batch of log entries to the control plane.

Request body (api.LogBatch = []api.LogEntry):

json
[
  {
    "timestamp": "2026-02-12T10:30:00Z",
    "source": "journald",
    "unit": "plexd.service",
    "message": "tunnel established with peer-abc-123",
    "severity": "info",
    "hostname": "node-01.example.com"
  },
  {
    "timestamp": "2026-02-12T10:30:01Z",
    "source": "journald",
    "unit": "sshd.service",
    "message": "Failed password for root from 192.168.1.100",
    "severity": "warning",
    "hostname": "node-01.example.com"
  }
]

LogEntry Schema

go
type LogEntry struct {
    Timestamp time.Time `json:"timestamp"`
    Source    string    `json:"source"`
    Unit     string    `json:"unit"`
    Message  string    `json:"message"`
    Severity string    `json:"severity"`
    Hostname string    `json:"hostname"`
}
FieldTypeDescription
Timestamptime.TimeWhen the log entry was recorded (RFC 3339)
SourcestringLog source identifier (e.g. "journald")
UnitstringSystemd unit name (empty if not from a unit)
MessagestringLog message content
SeveritystringSyslog severity string (see priority mapping table)
HostnamestringOriginating node hostname

Error Handling

ScenarioBehavior
Source returns errorLog warn, skip source, continue with others
Source panicsRecover panic, log error, continue with other sources
Reporter returns errorLog warn, retain unsent entries in buffer, retry next cycle
All sources failEmpty buffer, report tick is a no-op
Buffer exceeds capacityDrop oldest entries, log warn with dropped count
Context cancelled (shutdown)Best-effort flush, return ctx.Err()
Log forwarding disabledReturn nil immediately, no goroutines started

Logging

All log entries use component=logfwd.

LevelEventKeys
InfoLog forwarding disabledcomponent
WarnSource failedcomponent, error
WarnLog report failedcomponent, error
WarnBuffer overflow, dropping entriescomponent, dropped
WarnUsing cached credentialcomponent, error
WarnLocal log report failedcomponent, error
InfoLocal endpoint enabledpipeline, url

Integration Points

With api.ControlPlane

api.ControlPlane satisfies the LogReporter interface directly:

go
controlPlane, _ := api.NewControlPlane(apiCfg, "1.0.0", logger)

// controlPlane.ReportLogs matches LogReporter.ReportLogs
fwd := logfwd.NewForwarder(cfg, sources, controlPlane, nodeID, hostname, logger)
fwd.Run(ctx)

With LocalReporter and MultiReporter

When LocalEndpoint.URL is configured, the Forwarder receives a MultiReporter instead of the control plane client directly:

go
var logReporter logfwd.LogReporter = controlPlane
if cfg.LogFwd.LocalEndpoint.URL != "" {
    localReporter := logfwd.NewLocalReporter(cfg.LogFwd.LocalEndpoint, controlPlane, nsk, identity.NodeID, logger)
    logReporter = logfwd.NewMultiReporter(controlPlane, localReporter, logger)
    logger.Info("local endpoint enabled", "pipeline", "logfwd", "url", cfg.LogFwd.LocalEndpoint.URL)
}
fwd := logfwd.NewForwarder(cfg.LogFwd, sources, logReporter, identity.NodeID, hostname, logger)

When LocalEndpoint.URL is empty, no MultiReporter is created and behavior is identical to the single-reporter pipeline.

Integration Tests

See Local Endpoint Integration Tests for the full integration test suite covering dual delivery, error isolation, credential resolution, and TLS skip-verify across all three pipelines.