zoobzio December 13, 2025 Edit this page

Error Handling

Patterns for robust error handling in LLM applications.

Error Types

zyn errors fall into categories:

TypeCauseRecovery
Provider errorsNetwork, rate limits, API failuresRetry, fallback
Parse errorsInvalid JSON from LLMRetry, log for analysis
Validation errorsLLM output fails validationRetry, adjust prompt
Timeout errorsLLM took too longRetry, increase timeout
Context errorsContext canceledDon't retry

Basic Error Handling

result, err := synapse.Fire(ctx, session, input)
if err != nil {
    switch {
    case errors.Is(err, context.Canceled):
        // Context was canceled - don't retry
        return fmt.Errorf("operation canceled")

    case errors.Is(err, context.DeadlineExceeded):
        // Timeout - may retry with longer timeout
        return fmt.Errorf("operation timed out")

    case strings.Contains(err.Error(), "rate limit"):
        // Rate limited - backoff and retry
        return fmt.Errorf("rate limited, please retry later")

    case strings.Contains(err.Error(), "validation"):
        // LLM output invalid - log for analysis
        log.Printf("Validation error: %v", err)
        return fmt.Errorf("unable to process request")

    default:
        // Unknown error
        return fmt.Errorf("unexpected error: %w", err)
    }
}

Custom Error Handler Pipeline

Use pipz for structured error handling:

import "github.com/zoobz-io/pipz"

// Define identity for the error handler
var handleErrorsID = pipz.NewIdentity("handle-errors", "Handles and categorizes errors")

// Create error handler
errorHandler := pipz.Apply(handleErrorsID,
    func(ctx context.Context, e *pipz.Error[*zyn.SynapseRequest]) (*pipz.Error[*zyn.SynapseRequest], error) {
        // Log all errors
        log.Printf("Synapse error: %v (input: %s)", e.Err, e.Data.Input)

        // Track metrics by error type
        switch {
        case strings.Contains(e.Err.Error(), "rate limit"):
            metrics.Increment("llm_rate_limit_errors")
        case strings.Contains(e.Err.Error(), "timeout"):
            metrics.Increment("llm_timeout_errors")
        case strings.Contains(e.Err.Error(), "parse"):
            metrics.Increment("llm_parse_errors")
        default:
            metrics.Increment("llm_other_errors")
        }

        // Pass through (don't swallow)
        return e, nil
    },
)

// Apply to synapse
synapse, _ := zyn.Binary("question", provider,
    zyn.WithErrorHandler(errorHandler),
    zyn.WithRetry(3),
)

Graceful Degradation

Provide fallback behavior:

type DegradingClassifier struct {
    primary  *zyn.ClassificationSynapse
    fallback *zyn.ClassificationSynapse
    default_ string
}

func (c *DegradingClassifier) Classify(ctx context.Context, input string) (string, error) {
    session := zyn.NewSession()

    // Try primary (complex model)
    result, err := c.primary.Fire(ctx, session, input)
    if err == nil {
        return result, nil
    }
    log.Printf("Primary failed: %v, trying fallback", err)

    // Try fallback (simpler model)
    result, err = c.fallback.Fire(ctx, zyn.NewSession(), input)
    if err == nil {
        return result, nil
    }
    log.Printf("Fallback failed: %v, using default", err)

    // Return safe default
    return c.default_, nil
}

Retry with Validation Feedback

Retry with context about previous failures:

func extractWithRetry[T zyn.Validator](
    ctx context.Context,
    synapse *zyn.ExtractionSynapse[T],
    input string,
    maxAttempts int,
) (T, error) {
    var zero T
    session := zyn.NewSession()

    for attempt := 1; attempt <= maxAttempts; attempt++ {
        result, err := synapse.Fire(ctx, session, input)

        if err == nil {
            return result, nil
        }

        if attempt < maxAttempts {
            // Add error context for next attempt
            session.Append(zyn.RoleUser, fmt.Sprintf(
                "Previous attempt failed: %v. Please try again with valid output.",
                err,
            ))
        }
    }

    return zero, fmt.Errorf("failed after %d attempts", maxAttempts)
}

Circuit Breaker with Monitoring

Track circuit breaker state:

type MonitoredSynapse struct {
    synapse      *zyn.BinarySynapse
    circuitOpen  atomic.Bool
    lastFailure  atomic.Value  // time.Time
}

func NewMonitoredSynapse(provider zyn.Provider) (*MonitoredSynapse, error) {
    ms := &MonitoredSynapse{}

    synapse, err := zyn.Binary("question", provider,
        zyn.WithCircuitBreaker(5, 30*time.Second),
        zyn.WithErrorHandler(ms.trackErrors()),
    )
    if err != nil {
        return nil, err
    }

    ms.synapse = synapse
    return ms, nil
}

// Identity for tracking errors
var trackErrorsID = pipz.NewIdentity("track-errors", "Tracks circuit breaker state")

func (ms *MonitoredSynapse) trackErrors() pipz.Chainable[*pipz.Error[*zyn.SynapseRequest]] {
    return pipz.Apply(trackErrorsID,
        func(ctx context.Context, e *pipz.Error[*zyn.SynapseRequest]) (*pipz.Error[*zyn.SynapseRequest], error) {
            if strings.Contains(e.Err.Error(), "circuit open") {
                ms.circuitOpen.Store(true)
                metrics.Set("circuit_breaker_open", 1)
            } else {
                ms.circuitOpen.Store(false)
                metrics.Set("circuit_breaker_open", 0)
            }
            ms.lastFailure.Store(time.Now())
            return e, nil
        },
    )
}

func (ms *MonitoredSynapse) IsHealthy() bool {
    return !ms.circuitOpen.Load()
}

Timeout Escalation

Increase timeout on retry:

func fireWithEscalatingTimeout(
    ctx context.Context,
    provider zyn.Provider,
    input string,
    initialTimeout time.Duration,
    maxAttempts int,
) (bool, error) {
    timeout := initialTimeout

    for attempt := 1; attempt <= maxAttempts; attempt++ {
        // Create synapse with current timeout
        synapse, _ := zyn.Binary("question", provider,
            zyn.WithTimeout(timeout),
        )

        session := zyn.NewSession()
        result, err := synapse.Fire(ctx, session, input)

        if err == nil {
            return result, nil
        }

        if !errors.Is(err, context.DeadlineExceeded) {
            // Not a timeout - don't escalate
            return false, err
        }

        // Escalate timeout for next attempt
        timeout = timeout * 2
        log.Printf("Attempt %d timed out, escalating to %v", attempt, timeout)
    }

    return false, fmt.Errorf("exceeded max attempts with timeout escalation")
}

Error Aggregation

Collect and report errors from batch operations:

type BatchErrors struct {
    Total    int
    Failed   int
    ByType   map[string]int
    Samples  []error
}

func processBatch(ctx context.Context, synapse *zyn.BinarySynapse, inputs []string) (*BatchErrors, []bool) {
    errors := &BatchErrors{
        Total:   len(inputs),
        ByType:  make(map[string]int),
        Samples: make([]error, 0, 5),
    }
    results := make([]bool, len(inputs))

    for i, input := range inputs {
        session := zyn.NewSession()
        result, err := synapse.Fire(ctx, session, input)

        if err != nil {
            errors.Failed++
            errors.ByType[categorizeError(err)]++
            if len(errors.Samples) < 5 {
                errors.Samples = append(errors.Samples, err)
            }
        } else {
            results[i] = result
        }
    }

    return errors, results
}

func categorizeError(err error) string {
    switch {
    case strings.Contains(err.Error(), "rate limit"):
        return "rate_limit"
    case strings.Contains(err.Error(), "timeout"):
        return "timeout"
    case strings.Contains(err.Error(), "parse"):
        return "parse"
    case strings.Contains(err.Error(), "validation"):
        return "validation"
    default:
        return "other"
    }
}

Observability Integration

Log errors with full context:

import "github.com/zoobz-io/capitan"

// Track all failures
capitan.Hook(zyn.RequestFailed, func(ctx context.Context, e *capitan.Event) {
    requestID, _ := zyn.RequestIDKey.From(e)
    synapseType, _ := zyn.SynapseTypeKey.From(e)
    task, _ := zyn.PromptTaskKey.From(e)
    err, _ := zyn.ErrorKey.From(e)

    log.Printf("REQUEST_FAILED request_id=%s type=%s task=%q error=%s",
        requestID, synapseType, task, err)

    metrics.Increment("synapse_failures",
        "type", synapseType,
        "error_category", categorizeError(fmt.Errorf(err)),
    )
})

// Track parse/validation failures separately
capitan.Hook(zyn.ResponseParseFailed, func(ctx context.Context, e *capitan.Event) {
    requestID, _ := zyn.RequestIDKey.From(e)
    errorType, _ := zyn.ErrorTypeKey.From(e)
    response, _ := zyn.ResponseKey.From(e)

    log.Printf("RESPONSE_PARSE_FAILED request_id=%s error_type=%s response=%s",
        requestID, errorType, truncate(response, 200))

    // These warrant investigation - LLM producing invalid output
    if errorType == "parse_error" {
        alerting.Warn("LLM parse error", map[string]string{
            "request_id": requestID,
            "response":   truncate(response, 500),
        })
    }
})

Next Steps