zoobzio December 13, 2025 Edit this page

Extraction Pipelines

Patterns for extracting structured data from unstructured text.

Contact Information Extractor

Extract contact details from emails or documents:

package main

import (
    "context"
    "fmt"
    "regexp"

    "github.com/zoobz-io/zyn"
)

type Contact struct {
    Name    string `json:"name"`
    Email   string `json:"email"`
    Phone   string `json:"phone,omitempty"`
    Company string `json:"company,omitempty"`
    Title   string `json:"title,omitempty"`
}

func (c Contact) Validate() error {
    if c.Name == "" {
        return fmt.Errorf("name is required")
    }
    if c.Email != "" && !isValidEmail(c.Email) {
        return fmt.Errorf("invalid email format: %s", c.Email)
    }
    return nil
}

func isValidEmail(email string) bool {
    pattern := `^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$`
    matched, _ := regexp.MatchString(pattern, email)
    return matched
}

type ContactExtractor struct {
    synapse *zyn.ExtractionSynapse[Contact]
}

func NewContactExtractor(provider zyn.Provider) (*ContactExtractor, error) {
    synapse, err := zyn.Extract[Contact](
        "Extract contact information including name, email, phone, company, and job title",
        provider,
        zyn.WithRetry(2),
    )
    if err != nil {
        return nil, err
    }
    return &ContactExtractor{synapse: synapse}, nil
}

func (e *ContactExtractor) Extract(ctx context.Context, text string) (Contact, error) {
    session := zyn.NewSession()
    return e.synapse.Fire(ctx, session, text)
}

// Usage
func main() {
    extractor, _ := NewContactExtractor(provider)

    text := `
        Hi, I'm John Smith from Acme Corp. I'm the VP of Engineering.
        You can reach me at john.smith@acme.com or call (555) 123-4567.
    `

    contact, err := extractor.Extract(context.Background(), text)
    // contact = {Name: "John Smith", Email: "john.smith@acme.com", Phone: "(555) 123-4567", Company: "Acme Corp", Title: "VP of Engineering"}
}

Invoice Data Extraction

Extract structured invoice data:

type LineItem struct {
    Description string  `json:"description"`
    Quantity    int     `json:"quantity"`
    UnitPrice   float64 `json:"unit_price"`
    Total       float64 `json:"total"`
}

type Invoice struct {
    InvoiceNumber string     `json:"invoice_number"`
    Date          string     `json:"date"`
    Vendor        string     `json:"vendor"`
    Items         []LineItem `json:"items"`
    Subtotal      float64    `json:"subtotal"`
    Tax           float64    `json:"tax"`
    Total         float64    `json:"total"`
}

func (i Invoice) Validate() error {
    if i.InvoiceNumber == "" {
        return fmt.Errorf("invoice number required")
    }
    if len(i.Items) == 0 {
        return fmt.Errorf("at least one line item required")
    }
    if i.Total <= 0 {
        return fmt.Errorf("total must be positive")
    }

    // Validate line items sum
    var itemsTotal float64
    for _, item := range i.Items {
        itemsTotal += item.Total
    }
    if abs(itemsTotal-i.Subtotal) > 0.01 {
        return fmt.Errorf("line items don't sum to subtotal")
    }

    return nil
}

func extractInvoice(ctx context.Context, provider zyn.Provider, invoiceText string) (Invoice, error) {
    extractor, _ := zyn.Extract[Invoice](
        "Extract invoice details including number, date, vendor, line items, and totals",
        provider,
    )

    session := zyn.NewSession()
    return extractor.Fire(ctx, session, invoiceText)
}

Multi-Step Extraction Pipeline

Extract complex data in stages:

type RawEvent struct {
    Title       string `json:"title"`
    Description string `json:"description"`
    RawDate     string `json:"raw_date"`
    RawLocation string `json:"raw_location"`
}

func (r RawEvent) Validate() error {
    if r.Title == "" {
        return fmt.Errorf("title required")
    }
    return nil
}

type ParsedDate struct {
    Year   int    `json:"year"`
    Month  int    `json:"month"`
    Day    int    `json:"day"`
    Hour   int    `json:"hour"`
    Minute int    `json:"minute"`
    TZ     string `json:"timezone"`
}

func (p ParsedDate) Validate() error {
    if p.Year < 2000 || p.Year > 2100 {
        return fmt.Errorf("invalid year: %d", p.Year)
    }
    return nil
}

type Location struct {
    Venue   string  `json:"venue"`
    Address string  `json:"address"`
    City    string  `json:"city"`
    State   string  `json:"state"`
    Lat     float64 `json:"latitude,omitempty"`
    Lng     float64 `json:"longitude,omitempty"`
}

func (l Location) Validate() error {
    if l.City == "" {
        return fmt.Errorf("city required")
    }
    return nil
}

type Event struct {
    Title       string
    Description string
    Date        ParsedDate
    Location    Location
}

func extractEvent(ctx context.Context, provider zyn.Provider, text string) (*Event, error) {
    session := zyn.NewSession()

    // Step 1: Extract raw event data
    rawExtractor, _ := zyn.Extract[RawEvent]("Extract event title, description, date text, and location text", provider)
    raw, err := rawExtractor.Fire(ctx, session, text)
    if err != nil {
        return nil, fmt.Errorf("raw extraction failed: %w", err)
    }

    // Step 2: Parse date (with context from step 1)
    dateExtractor, _ := zyn.Extract[ParsedDate]("Parse date into structured components", provider)
    date, err := dateExtractor.Fire(ctx, session, raw.RawDate)
    if err != nil {
        return nil, fmt.Errorf("date parsing failed: %w", err)
    }

    // Step 3: Parse location (with full context)
    locationExtractor, _ := zyn.Extract[Location]("Parse location into structured components", provider)
    location, err := locationExtractor.Fire(ctx, session, raw.RawLocation)
    if err != nil {
        return nil, fmt.Errorf("location parsing failed: %w", err)
    }

    return &Event{
        Title:       raw.Title,
        Description: raw.Description,
        Date:        date,
        Location:    location,
    }, nil
}

Extraction with Fallback

Handle extraction failures gracefully:

type ProductInfo struct {
    Name     string   `json:"name"`
    Price    float64  `json:"price"`
    Currency string   `json:"currency"`
    Features []string `json:"features"`
}

func (p ProductInfo) Validate() error {
    if p.Name == "" {
        return fmt.Errorf("product name required")
    }
    if p.Price < 0 {
        return fmt.Errorf("price cannot be negative")
    }
    return nil
}

type BasicProductInfo struct {
    Name        string `json:"name"`
    Description string `json:"description"`
}

func (b BasicProductInfo) Validate() error {
    if b.Name == "" {
        return fmt.Errorf("name required")
    }
    return nil
}

func extractProduct(ctx context.Context, provider zyn.Provider, text string) (any, error) {
    session := zyn.NewSession()

    // Try detailed extraction first
    detailed, _ := zyn.Extract[ProductInfo]("Extract detailed product information", provider)
    product, err := detailed.Fire(ctx, session, text)
    if err == nil {
        return product, nil
    }

    // Fall back to basic extraction
    basic, _ := zyn.Extract[BasicProductInfo]("Extract basic product name and description", provider)
    return basic.Fire(ctx, zyn.NewSession(), text)
}

Batch Extraction with Progress

Extract from multiple documents with progress tracking:

type ExtractionProgress struct {
    Total     int
    Completed int
    Errors    int
}

func extractBatch[T zyn.Validator](
    ctx context.Context,
    synapse *zyn.ExtractionSynapse[T],
    documents []string,
    progressCh chan<- ExtractionProgress,
) ([]T, []error) {
    results := make([]T, len(documents))
    errors := make([]error, len(documents))

    progress := ExtractionProgress{Total: len(documents)}

    for i, doc := range documents {
        session := zyn.NewSession()
        result, err := synapse.Fire(ctx, session, doc)

        if err != nil {
            errors[i] = err
            progress.Errors++
        } else {
            results[i] = result
        }

        progress.Completed++
        if progressCh != nil {
            progressCh <- progress
        }
    }

    return results, errors
}

Next Steps