Error Handling
Patterns for robust error handling in LLM applications.
Error Types
zyn errors fall into categories:
| Type | Cause | Recovery |
|---|---|---|
| Provider errors | Network, rate limits, API failures | Retry, fallback |
| Parse errors | Invalid JSON from LLM | Retry, log for analysis |
| Validation errors | LLM output fails validation | Retry, adjust prompt |
| Timeout errors | LLM took too long | Retry, increase timeout |
| Context errors | Context canceled | Don't retry |
Basic Error Handling
result, err := synapse.Fire(ctx, session, input)
if err != nil {
switch {
case errors.Is(err, context.Canceled):
// Context was canceled - don't retry
return fmt.Errorf("operation canceled")
case errors.Is(err, context.DeadlineExceeded):
// Timeout - may retry with longer timeout
return fmt.Errorf("operation timed out")
case strings.Contains(err.Error(), "rate limit"):
// Rate limited - backoff and retry
return fmt.Errorf("rate limited, please retry later")
case strings.Contains(err.Error(), "validation"):
// LLM output invalid - log for analysis
log.Printf("Validation error: %v", err)
return fmt.Errorf("unable to process request")
default:
// Unknown error
return fmt.Errorf("unexpected error: %w", err)
}
}
Custom Error Handler Pipeline
Use pipz for structured error handling:
import "github.com/zoobz-io/pipz"
// Define identity for the error handler
var handleErrorsID = pipz.NewIdentity("handle-errors", "Handles and categorizes errors")
// Create error handler
errorHandler := pipz.Apply(handleErrorsID,
func(ctx context.Context, e *pipz.Error[*zyn.SynapseRequest]) (*pipz.Error[*zyn.SynapseRequest], error) {
// Log all errors
log.Printf("Synapse error: %v (input: %s)", e.Err, e.Data.Input)
// Track metrics by error type
switch {
case strings.Contains(e.Err.Error(), "rate limit"):
metrics.Increment("llm_rate_limit_errors")
case strings.Contains(e.Err.Error(), "timeout"):
metrics.Increment("llm_timeout_errors")
case strings.Contains(e.Err.Error(), "parse"):
metrics.Increment("llm_parse_errors")
default:
metrics.Increment("llm_other_errors")
}
// Pass through (don't swallow)
return e, nil
},
)
// Apply to synapse
synapse, _ := zyn.Binary("question", provider,
zyn.WithErrorHandler(errorHandler),
zyn.WithRetry(3),
)
Graceful Degradation
Provide fallback behavior:
type DegradingClassifier struct {
primary *zyn.ClassificationSynapse
fallback *zyn.ClassificationSynapse
default_ string
}
func (c *DegradingClassifier) Classify(ctx context.Context, input string) (string, error) {
session := zyn.NewSession()
// Try primary (complex model)
result, err := c.primary.Fire(ctx, session, input)
if err == nil {
return result, nil
}
log.Printf("Primary failed: %v, trying fallback", err)
// Try fallback (simpler model)
result, err = c.fallback.Fire(ctx, zyn.NewSession(), input)
if err == nil {
return result, nil
}
log.Printf("Fallback failed: %v, using default", err)
// Return safe default
return c.default_, nil
}
Retry with Validation Feedback
Retry with context about previous failures:
func extractWithRetry[T zyn.Validator](
ctx context.Context,
synapse *zyn.ExtractionSynapse[T],
input string,
maxAttempts int,
) (T, error) {
var zero T
session := zyn.NewSession()
for attempt := 1; attempt <= maxAttempts; attempt++ {
result, err := synapse.Fire(ctx, session, input)
if err == nil {
return result, nil
}
if attempt < maxAttempts {
// Add error context for next attempt
session.Append(zyn.RoleUser, fmt.Sprintf(
"Previous attempt failed: %v. Please try again with valid output.",
err,
))
}
}
return zero, fmt.Errorf("failed after %d attempts", maxAttempts)
}
Circuit Breaker with Monitoring
Track circuit breaker state:
type MonitoredSynapse struct {
synapse *zyn.BinarySynapse
circuitOpen atomic.Bool
lastFailure atomic.Value // time.Time
}
func NewMonitoredSynapse(provider zyn.Provider) (*MonitoredSynapse, error) {
ms := &MonitoredSynapse{}
synapse, err := zyn.Binary("question", provider,
zyn.WithCircuitBreaker(5, 30*time.Second),
zyn.WithErrorHandler(ms.trackErrors()),
)
if err != nil {
return nil, err
}
ms.synapse = synapse
return ms, nil
}
// Identity for tracking errors
var trackErrorsID = pipz.NewIdentity("track-errors", "Tracks circuit breaker state")
func (ms *MonitoredSynapse) trackErrors() pipz.Chainable[*pipz.Error[*zyn.SynapseRequest]] {
return pipz.Apply(trackErrorsID,
func(ctx context.Context, e *pipz.Error[*zyn.SynapseRequest]) (*pipz.Error[*zyn.SynapseRequest], error) {
if strings.Contains(e.Err.Error(), "circuit open") {
ms.circuitOpen.Store(true)
metrics.Set("circuit_breaker_open", 1)
} else {
ms.circuitOpen.Store(false)
metrics.Set("circuit_breaker_open", 0)
}
ms.lastFailure.Store(time.Now())
return e, nil
},
)
}
func (ms *MonitoredSynapse) IsHealthy() bool {
return !ms.circuitOpen.Load()
}
Timeout Escalation
Increase timeout on retry:
func fireWithEscalatingTimeout(
ctx context.Context,
provider zyn.Provider,
input string,
initialTimeout time.Duration,
maxAttempts int,
) (bool, error) {
timeout := initialTimeout
for attempt := 1; attempt <= maxAttempts; attempt++ {
// Create synapse with current timeout
synapse, _ := zyn.Binary("question", provider,
zyn.WithTimeout(timeout),
)
session := zyn.NewSession()
result, err := synapse.Fire(ctx, session, input)
if err == nil {
return result, nil
}
if !errors.Is(err, context.DeadlineExceeded) {
// Not a timeout - don't escalate
return false, err
}
// Escalate timeout for next attempt
timeout = timeout * 2
log.Printf("Attempt %d timed out, escalating to %v", attempt, timeout)
}
return false, fmt.Errorf("exceeded max attempts with timeout escalation")
}
Error Aggregation
Collect and report errors from batch operations:
type BatchErrors struct {
Total int
Failed int
ByType map[string]int
Samples []error
}
func processBatch(ctx context.Context, synapse *zyn.BinarySynapse, inputs []string) (*BatchErrors, []bool) {
errors := &BatchErrors{
Total: len(inputs),
ByType: make(map[string]int),
Samples: make([]error, 0, 5),
}
results := make([]bool, len(inputs))
for i, input := range inputs {
session := zyn.NewSession()
result, err := synapse.Fire(ctx, session, input)
if err != nil {
errors.Failed++
errors.ByType[categorizeError(err)]++
if len(errors.Samples) < 5 {
errors.Samples = append(errors.Samples, err)
}
} else {
results[i] = result
}
}
return errors, results
}
func categorizeError(err error) string {
switch {
case strings.Contains(err.Error(), "rate limit"):
return "rate_limit"
case strings.Contains(err.Error(), "timeout"):
return "timeout"
case strings.Contains(err.Error(), "parse"):
return "parse"
case strings.Contains(err.Error(), "validation"):
return "validation"
default:
return "other"
}
}
Observability Integration
Log errors with full context:
import "github.com/zoobz-io/capitan"
// Track all failures
capitan.Hook(zyn.RequestFailed, func(ctx context.Context, e *capitan.Event) {
requestID, _ := zyn.RequestIDKey.From(e)
synapseType, _ := zyn.SynapseTypeKey.From(e)
task, _ := zyn.PromptTaskKey.From(e)
err, _ := zyn.ErrorKey.From(e)
log.Printf("REQUEST_FAILED request_id=%s type=%s task=%q error=%s",
requestID, synapseType, task, err)
metrics.Increment("synapse_failures",
"type", synapseType,
"error_category", categorizeError(fmt.Errorf(err)),
)
})
// Track parse/validation failures separately
capitan.Hook(zyn.ResponseParseFailed, func(ctx context.Context, e *capitan.Event) {
requestID, _ := zyn.RequestIDKey.From(e)
errorType, _ := zyn.ErrorTypeKey.From(e)
response, _ := zyn.ResponseKey.From(e)
log.Printf("RESPONSE_PARSE_FAILED request_id=%s error_type=%s response=%s",
requestID, errorType, truncate(response, 200))
// These warrant investigation - LLM producing invalid output
if errorType == "parse_error" {
alerting.Warn("LLM parse error", map[string]string{
"request_id": requestID,
"response": truncate(response, 500),
})
}
})
Next Steps
- Reliability Guide - Built-in reliability patterns
- Observability Guide - Monitoring setup