Skip to content

MFO Provider Unified Documentation

Version: 1.0 Date: May 03, 2025

1. Introduction

The intent of this documentation is to provide guideline to help the developper extending the slop server by adding its own pkg/provider/

Welcome to the unified documentation for developing and understanding providers within the SLOP (Simple Language Open Protocol) Server. This document aims to be a comprehensive resource for developers looking to extend the capabilities of the SLOP server by creating new providers or integrating existing ones.

Purpose

The primary purpose of this documentation is to provide a clear, detailed, and technically accurate guide for developers. It covers the core architectural concepts underpinning the provider system, offers a step-by-step guide to developing new providers, and serves as a reference for the functionalities offered by the currently available providers. Additionally, it introduces a standardized way to describe provider capabilities, including prerequisites, authentication methods, and common usage scenarios, to facilitate easier integration and understanding.

Target Audience

This document is primarily intended for software developers involved in:

  • Building new providers to integrate external services or custom logic into the SLOP server.
  • Maintaining or extending existing SLOP providers.
  • Understanding the capabilities and interaction patterns of SLOP providers for integration purposes.

While the focus is technical, the inclusion of use-case scenarios and clear descriptions may also be beneficial for system architects and integrators working with the SLOP ecosystem.

Scope

This documentation covers:

  • Core Concepts: The fundamental principles of the SLOP provider architecture, including autonomy, registration, tools, webhooks, events, jobs, storage, configuration, and chaining.
  • Development Guide: A practical guide to creating a new provider, from initial setup to testing.
  • Provider Reference: Detailed descriptions of the existing providers (template_example, filesystem, llm, unipile, email_draft_preparator), including their specific tools, events, configurations, and usage scenarios with API examples.
  • API Summary: A concise overview of the relevant SLOP Server API endpoints for interacting with providers, tools, events, and jobs.

It consolidates information from previous guides and internal documentation, incorporating clarifications and adding the new standardized provider description format.


Provider Autonomy

Maintaining autonomy between providers is a core principle in the SLOP Server architecture. This ensures that providers are decoupled, modular, and easier to maintain and update independently.

Guideline: Provider packages (e.g., pkg/providers/unipile) MUST remain autonomous and decoupled from one another.

Core Principles:

  1. No Direct Cross-Provider Imports: A provider package (e.g., pkg/providers/A) MUST NOT directly import another specific provider package (e.g., pkg/providers/B). The only exception is importing shared models packages if absolutely necessary.
  2. No Direct Cross-Provider Calls: A provider package MUST NOT directly call functions or methods defined in another specific provider package.
  3. Communication via Abstraction: Interaction between providers MUST occur through abstracted mechanisms provided by the SLOP Server core:
    • Pub/Sub System: Providers publish generic events (e.g., providerA.entity.created) using the pubsub.Publisher interface. Other components (potentially dedicated listeners outside the provider packages, or event triggers) subscribe to these events to initiate actions.
    • Job Manager: Complex workflows or actions involving multiple providers should be orchestrated by creating jobs via the central jobmanager.JobManager. Jobs can be triggered by event listeners or external API calls, not directly by another provider.
    • Resource Manager: Providers can share data indirectly by creating or reading resources via the resources.Manager. Event payloads can contain resource IDs instead of large data blobs.
    • Tool Execution: A provider can execute a tool registered by another provider via the providers.ProviderRegistry or providers.Provider interface, but this should be used judiciously, typically within a job or specific workflow handler, not as a general-purpose direct call mechanism.
  4. Shared Dependencies: Providers MAY import shared, generic packages like internal core types, interfaces (internal/providers, internal/core, internal/jobmanager, internal/resources, internal/pubsub), or common model definitions, but NOT specific implementations of other providers.

Rationale:

  • Reduced Complexity: Prevents tangled dependencies and makes the system easier to understand.
  • Improved Maintainability: Changes within one provider are less likely to break others.
  • Enhanced Testability: Providers can be tested in isolation more effectively.
  • Flexibility: Allows providers to be added, removed, or replaced with minimal impact on the rest of the system.

Example Violation (Incorrect):

// pkg/providers/providerA/adapter.go
package providerA

import (
    // ... other imports
    providerB "gitlab.com/webigniter/slop-server/pkg/providers/providerB" // <-- VIOLATION: Direct import
)

func (a *AdapterA) HandleSomeEvent(...) error {
    // ... process event ...
    // Directly call a function in Provider B
    err := providerB.DoSomethingSpecific(...) // <-- VIOLATION: Direct call
    // ...
}

Example Correct Implementation (Using Pub/Sub and Jobs):

// pkg/providers/providerA/adapter.go
package providerA

import (
    "gitlab.com/webigniter/slop-server/internal/pubsub"
    // NO import for providerB
)

const EventProviderACompleted = "providerA.task.completed"

func (a *AdapterA) HandleSomeEvent(ctx context.Context, ...) error {
    // ... process event ...
    resultData := map[string]interface{}{ "key": "value" }

    // Publish a generic event
    if err := a.notifier.Publish(ctx, EventProviderACompleted, resultData); err != nil {
         log.Printf("ERROR publishing event %s: %v", EventProviderACompleted, err)
    }
    // DO NOT call Provider B directly.
    return nil
}

// --- Elsewhere (e.g., internal/listeners/listeners.go or via Event Trigger Config) ---

import (
    "gitlab.com/webigniter/slop-server/internal/jobmanager"
    // Potentially import providerB's job registration details if needed
)

func handleProviderACompletion(ctx context.Context, eventPayload map[string]interface{}) {
    // Extract necessary info from eventPayload
    // Get Job Manager instance (dependency injection)
    jobMgr := getJobManagerFromContext(ctx)

    // Schedule a job defined by Provider B
    jobParams := map[string]interface{}{ /* derived from eventPayload */ }
    job := jobmanager.Job{ Type: "providerB.process_data", Parameters: jobParams }
    _, err := jobMgr.ScheduleJob(ctx, job)
    if err != nil { /* ... handle error ... */ }
}

By adhering to these principles, the SLOP Server maintains a clean, scalable, and robust architecture.

Provider Registration

For a provider to be recognized and utilized by the SLOP Server, it must be registered during the server's initialization phase. This process makes the provider's capabilities (tools, webhooks, jobs, events) available to the rest of the system.

Mechanism:

  1. Implementation: The provider must implement the providers.Provider interface.
  2. Instantiation: A factory function (e.g., NewProvider()) is typically defined within the provider's package to create an instance of the provider's main struct (adapter).
  3. Registration Call: The core of the registration happens in the pkg/providers/providers.go file. An init() function within this package calls providers.RegisterProvider() for each provider that should be included in the server build.

Steps for Developers:

  1. Create Provider Instance: Ensure your provider package has a constructor function (e.g., func NewMyProvider() *MyProvider).
  2. Modify pkg/providers/providers.go:
    • Add an import statement for your new provider package.
    • Inside the init() function, call providers.RegisterProvider() passing an instance created by your constructor.

Example (pkg/providers/providers.go):

package providers

import (
    // Core provider interface
    "gitlab.com/webigniter/slop-server/internal/providers"

    // Import existing providers
    "gitlab.com/webigniter/slop-server/pkg/providers/filesystem"
    "gitlab.com/webigniter/slop-server/pkg/providers/llm/openai"
    "gitlab.com/webigniter/slop-server/pkg/providers/template_example"
    "gitlab.com/webigniter/slop-server/pkg/providers/unipile"
    "gitlab.com/webigniter/slop-server/pkg/providers/email_draft_preparator"

    // Import YOUR new provider package
    "gitlab.com/webigniter/slop-server/pkg/providers/my_provider" // <--- Add this import
)

func init() {
    // Register core/existing providers
    providers.RegisterProvider(filesystem.NewProvider())
    providers.RegisterProvider(openai.NewProvider())
    providers.RegisterProvider(template_example.NewProvider())
    providers.RegisterProvider(unipile.NewProvider())
    providers.RegisterProvider(emaildraftpreparator.NewProvider())

    // Register your new provider
    providers.RegisterProvider(myprovider.NewProvider()) // <--- Add this line
}

Initialization Sequence:

When the SLOP server starts:

  1. The init() function in pkg/providers/providers.go runs, calling RegisterProvider for all included providers.
  2. The central providers.Registry stores instances of each registered provider.
  3. During server setup, the core system iterates through the registered providers:
    • Calls the Initialize() method on each provider.
    • Calls RegisterTools(), RegisterWebhooks(), RegisterJobs(), RegisterEventDefinitions() (if implemented) on each provider, passing the respective registries (ToolRegistry, WebhookRegistry, JobRegistry, EventRegistry).

This ensures that all provider capabilities are properly configured and made available before the server starts accepting requests.

Webhooks (Incoming)

Webhooks provide a mechanism for external services to send asynchronous notifications or data to the SLOP Server. Providers can register specific endpoints to listen for these incoming requests.

Purpose:

  • To allow external systems (e.g., SaaS platforms, Git repositories, payment gateways) to push information to SLOP in real-time or near real-time.
  • To trigger internal workflows or data processing based on external events.

Mechanism:

  1. External Service Configuration: The external service is configured to send HTTP POST requests to a specific URL provided by the SLOP server.
  2. Provider Implementation: The provider implements the providers.WebhookRegisteringProvider interface.
  3. Registration: During initialization, the provider registers a specific path (e.g., /webhooks/{provider}/{webhook_id}) and a corresponding handler function using the providers.WebhookRegistry.
  4. Handling: When an HTTP request hits a registered webhook path, the SLOP server routes it to the provider's designated handler function.
  5. Processing: The handler function is responsible for:
    • Reading and parsing the request body (often JSON, but can vary).
    • Validating the payload (and potentially verifying signatures if implemented).
    • Performing minimal immediate processing.
    • Crucially: For any non-trivial processing, the handler should schedule a background job using the JobManager to avoid blocking the webhook response and ensure reliability. The job can be passed necessary information (like the payload or a reference to it).
    • Returning an appropriate HTTP status code quickly (e.g., 200 OK for immediate success, 202 Accepted if a job was scheduled).

Implementation (WebhookRegisteringProvider Interface):

// Interface definition (internal/providers/interfaces.go)
type WebhookRegisteringProvider interface {
    RegisterWebhooks(registry WebhookRegistry) error
}

// Example Provider Implementation (adapter.go)
var _ providers.WebhookRegisteringProvider = (*MyProvider)(nil)

func (p *MyProvider) RegisterWebhooks(registry providers.WebhookRegistry) error {
    // Construct a unique path for this webhook
    webhookPath := fmt.Sprintf("/webhooks/%s/my_event", ProviderName)
    err := registry.RegisterWebhook(webhookPath, p.handleMyEventWebhook)
    if err != nil {
        return fmt.Errorf("failed to register webhook %s: %w", webhookPath, err)
    }
    return nil
}

// Example Handler Implementation (e.g., webhook_handler.go)
func (p *MyProvider) handleMyEventWebhook(w http.ResponseWriter, r *http.Request) {
    // 1. Read Body
    body, err := io.ReadAll(r.Body)
    if err != nil { /* Handle error, return 400 */ return }

    // 2. Parse Payload
    var payload MyEventPayload
    if err := json.Unmarshal(body, &payload); err != nil { /* Handle error, return 400 */ return }

    // 3. Validate (Optional)
    if !isValid(payload) { /* Handle error, return 400/403 */ return }

    // 4. Schedule Job for actual processing
    jobParams := map[string]interface{}{"payload": payload} // Or store payload as resource and pass ID
    job := jobmanager.Job{ Type: "myprovider.process_event", Parameters: jobParams }
    jobID, err := p.jobManager.ScheduleJob(r.Context(), job) // Assuming jobManager is available
    if err != nil { /* Handle error, return 500 */ return }

    // 5. Respond Quickly
    w.WriteHeader(http.StatusAccepted)
    fmt.Fprintf(w, `{"status":"scheduled", "job_id":"%s"}`, jobID)
}

Webhook Flow Diagram:

sequenceDiagram
    participant ExternalService
    participant SLOP_Server
    participant ProviderWebhookHandler
    participant JobManager
    participant BackgroundJob

    ExternalService->>+SLOP_Server: POST /webhooks/{provider}/{webhook_id} (Payload)
    SLOP_Server->>+ProviderWebhookHandler: Invoke handler(request)
    ProviderWebhookHandler->>ProviderWebhookHandler: Validate/Parse Payload
    alt Minimal Processing (Not Recommended for complex tasks)
        ProviderWebhookHandler-->>-SLOP_Server: 2xx OK Response
    else Background Processing Needed (Recommended)
        ProviderWebhookHandler->>+JobManager: ScheduleJob(JobType, Payload/PayloadRef)
        JobManager-->>-ProviderWebhookHandler: JobID
        ProviderWebhookHandler-->>-SLOP_Server: 202 Accepted (JobID)
        JobManager->>+BackgroundJob: Execute Job(Payload/PayloadRef)
        BackgroundJob->>BackgroundJob: Perform processing
        BackgroundJob-->>-JobManager: Job Complete/Failed
    end
    SLOP_Server-->>-ExternalService: Response (2xx)

API Access:

  • POST /webhooks/:provider/:webhook_id: The public endpoint where external services send webhook data. The exact path is determined during provider registration.

Webhooks are essential for integrating SLOP with external systems that initiate communication.

Events (Internal Pub/Sub)

Beyond receiving external webhooks, providers can publish internal events using the SLOP Server's Pub/Sub (Publish/Subscribe) system. This mechanism facilitates decoupled communication between different parts of the server, including between providers or between providers and core services.

Purpose:

  • To notify other components about significant occurrences within a provider (e.g., unipile.email.received, myprovider.document.processed).
  • To trigger subsequent actions or workflows in a decoupled manner (See Provider Chaining).
  • To enable internal automation via Event Triggers.
  • To allow external clients to subscribe to server events via Webhook Subscriptions (though the client interacts with the subscription API, not the internal Pub/Sub directly).

Mechanism:

The internal Pub/Sub system relies on the pubsub.Publisher interface (defined in internal/pubsub/interface.go). Providers that need to publish events should receive an implementation of this interface via dependency injection during their initialization.

Workflow for Publishing Events:

  1. Dependency: Ensure your provider's adapter struct includes a field for pubsub.Publisher and receives it in its constructor.
    // Example Adapter Struct
    type MyProviderAdapter struct {
        // ... other fields
        notifier pubsub.Publisher
    }
    
    // Example Constructor
    func NewMyProviderAdapter(notifier pubsub.Publisher /*, ... other deps */) *MyProviderAdapter {
        return &MyProviderAdapter{ notifier: notifier /*, ... */}
    }
    
  2. Define Event Channel: Choose a unique, descriptive channel name, typically following the pattern provider_name.resource_type.action (e.g., unipile.account.sync_status.updated, email_draft_preparator.draft.prepared).
  3. Define Payload Struct: Create a Go struct representing the data for the event. This struct should be serializable to JSON and contain all necessary information for consumers.
    // Example Payload Struct
    type MyProviderDocumentProcessedEvent struct {
        DocumentID      string            `json:"documentId"`
        Status          string            `json:"status"`
        ProcessedFileURL string            `json:"processedFileUrl,omitempty"`
        Metadata        map[string]string `json:"metadata,omitempty"`
        Timestamp       time.Time         `json:"timestamp"`
    }
    
  4. Publish: Call the notifier.Publish(ctx, channelName, payloadStruct) method where the event occurs.
    func (a *MyProviderAdapter) completeProcessing(ctx context.Context, docID string, url string) error {
        // ... processing logic ...
    
        eventPayload := MyProviderDocumentProcessedEvent{
            DocumentID:      docID,
            Status:          "completed",
            ProcessedFileURL: url,
            Timestamp:       time.Now(),
        }
        channel := "myprovider.document.processed"
    
        log.Printf("Publishing event '%s' for document %s", channel, docID)
        err := a.notifier.Publish(ctx, channel, eventPayload)
        if err != nil {
            log.Printf("ERROR publishing event '%s': %v", channel, err)
            // Decide how to handle publish errors (retry, log, ignore?)
        }
        return err // Or nil depending on criticality
    }
    

Payload Handling:

  • Serialization: The payload struct passed to Publish must be marshallable to JSON using encoding/json.
  • Large Payloads: The system (specifically the internal/pubsub/postgres implementation) automatically handles large payloads. If the marshalled JSON exceeds a certain size (~7.5KB), it's transparently stored using the resources.Manager, and only a resource reference is published internally. Providers do not need to manually check payload size before publishing.

Event Discovery:

To make events discoverable and usable by API clients (for webhook subscriptions or triggers), providers should implement the providers.EventInfoProvider interface.

// Interface definition (internal/providers/interfaces.go)
type EventInfoProvider interface {
    // GetEventDefinitions returns basic info about published events.
    GetEventDefinitions() []EventDefinition
    // GetDetailedEventInfo returns detailed info (e.g., schema, examples) about events.
    GetDetailedEventInfo() []DetailedEventInfo
}

// EventDefinition describes an event published by a provider.
type EventDefinition struct {
    Channel        string      `json:"channel"`         // Unique event channel name (e.g., provider.resource.action)
    Description    string      `json:"description"`     // User-friendly description of the event
    SourceProvider string      `json:"source_provider"` // ID of the provider publishing the event
    PayloadSchema  interface{} `json:"payload_schema,omitempty"` // Optional: JSON Schema describing the payload structure
}

// DetailedEventInfo provides extended information about an event.
type DetailedEventInfo struct {
    Channel        string      `json:"channel"`
    Description    string      `json:"description"`
    SourceProvider string      `json:"source_provider"`
    PayloadSchema  interface{} `json:"payload_schema,omitempty"`
    // ExamplePayload interface{} `json:"example_payload,omitempty"` // Optional: Example payload
    // Notes          string      `json:"notes,omitempty"`           // Optional: Additional notes
}

// Example Provider Implementation (adapter.go)
var _ providers.EventInfoProvider = (*MyProvider)(nil)

func (p *MyProvider) GetEventDefinitions() []providers.EventDefinition {
    return []providers.EventDefinition{
        {
            Channel:        "myprovider.document.processed",
            Description:    "Triggered when a document has finished processing.",
            SourceProvider: ProviderName,
            // Optionally provide a JSON schema for the payload struct
        },
        // ... other events published by this provider
    }
}

// Example implementation for detailed info (can return nil if not needed)
func (p *MyProvider) GetDetailedEventInfo() []providers.DetailedEventInfo {
    // Return nil or populate detailed info based on EventDefinitions
    return nil
}

Registered event definitions can be queried via the authenticated endpoint: * GET /api/events/definitions // TODO: Confirm endpoint or if it uses detailed info

Internal events are a powerful tool for building reactive and decoupled workflows within the SLOP Server.

Jobs (Background Processing)

For operations that might take a significant amount of time (e.g., processing large files, interacting with slow external APIs, complex computations), providers should utilize the SLOP Server's Job Manager to execute these tasks asynchronously in the background.

Purpose:

  • Prevent Blocking: Avoid tying up API request handlers or webhook listeners while long operations complete.
  • Improve Responsiveness: Allow the server to respond quickly to initial requests (e.g., with a 202 Accepted status and a Job ID).
  • Enhance Reliability: Jobs can be queued, managed, and potentially retried if they fail.
  • Resource Management: Allows the server to manage the execution of resource-intensive tasks more effectively.

Mechanism:

The system relies on a central jobmanager.JobManager and associated interfaces:

  • jobmanager.JobManager: Interface for scheduling and managing jobs.
  • jobmanager.JobRegistry: Interface used during initialization to register job types and their corresponding handler functions.
  • providers.JobRegisteringProvider: Interface that providers implement to register their job types.
  • jobmanager.Job: Struct representing a job instance, containing its type, parameters, schedule, status, etc.

Workflow:

  1. Define Job Type: Choose a unique string identifier for the job type, typically provider_name.job_action (e.g., my_provider.process_data, email_draft_preparator.process_email).
  2. Implement Handler: Create a function within the provider that performs the actual work. This function must match the signature func(ctx context.Context, params map[string]interface{}) error.
  3. Register Job Type: Implement the providers.JobRegisteringProvider interface and use the RegisterJobs method to associate the job type string with its handler function via the JobRegistry.
  4. Schedule Job: When a long-running task needs to be performed (often triggered by an API call or a webhook handler), obtain an instance of the JobManager (usually via context or dependency injection) and call ScheduleJob(ctx, job). The jobmanager.Job struct passed to this method includes the Type, Parameters (as map[string]interface{}), and desired Schedule (often time.Now() for immediate queuing).
  5. Execution: The Job Manager picks up queued jobs and executes their registered handler functions, passing the stored parameters.

Implementation Details:

  • Registering Job Types (JobRegisteringProvider):
    // Interface definition (internal/providers/interfaces.go)
    type JobRegisteringProvider interface {
        RegisterJobs(registry jobmanager.JobRegistry) error
    }
    
    // Example Provider Implementation (adapter.go)
    var _ providers.JobRegisteringProvider = (*MyProvider)(nil)
    
    const JobTypeProcessData = "my_provider.process_data"
    
    func (p *MyProvider) RegisterJobs(registry jobmanager.JobRegistry) error {
        err := registry.RegisterJobType(JobTypeProcessData, p.processDataJob) // Link type string to handler func
        if err != nil {
            return fmt.Errorf("failed to register job %s: %w", JobTypeProcessData, err)
        }
        return nil
    }
    
  • Implementing Job Handler:
    // Example Handler (e.g., jobs.go)
    func (p *MyProvider) processDataJob(ctx context.Context, params map[string]interface{}) error {
        // 1. Extract and validate parameters
        dataID, ok := params["data_id"].(string)
        if !ok { return fmt.Errorf("missing or invalid 'data_id' parameter") }
    
        // 2. Perform the long-running task
        log.Printf("Starting job %s for data %s", JobTypeProcessData, dataID)
        // ... interact with external services, process data ...
        time.Sleep(30 * time.Second) // Simulate work
        log.Printf("Finished job %s for data %s", JobTypeProcessData, dataID)
    
        // 3. Return nil on success, or an error on failure
        return nil
    }
    
  • Scheduling a Job:
    // Example within a Webhook Handler or API Handler
    func (p *MyProvider) handleIncomingRequest(w http.ResponseWriter, r *http.Request) {
        // ... parse request, get dataID ...
        dataID := "some-data-identifier"
    
        // Get JobManager (example: from context)
        jobManager, ok := r.Context().Value(jobmanager.JobManagerKey).(jobmanager.JobManager)
        if !ok { /* Handle error - JobManager not found */ return }
    
        // Prepare job parameters
        jobParams := map[string]interface{}{"data_id": dataID}
    
        // Create and schedule the job
        job := jobmanager.Job{
            Type:       JobTypeProcessData,
            Parameters: jobParams,
            Schedule:   time.Now(), // Queue for immediate execution
            // Name, UserID etc. might be set automatically or manually
        }
        jobID, err := jobManager.ScheduleJob(r.Context(), job)
        if err != nil { /* Handle scheduling error, return 500 */ return }
    
        // Respond to the original request
        w.WriteHeader(http.StatusAccepted)
        fmt.Fprintf(w, `{"status":"processing_scheduled", "job_id":"%s"}`, jobID)
    }
    

API Access (Private - Requires Auth):

  • POST /api/jobs: Manually create and queue a new background job.
  • GET /api/jobs: List background jobs (filterable by status, user).
  • GET /api/jobs/:jobID: Get details of a specific background job.
  • POST /api/jobs/:jobID/cancel: Request cancellation of a running job.
  • GET /api/jobs/:jobID/history: Get the status history of a specific job.

Using the Job Manager is crucial for building robust and scalable providers that handle potentially long-running tasks without impacting server performance.

Storage

Providers often need to persist data related to their operation, such as configuration, state, user credentials, or processed results. To maintain autonomy and testability, providers should manage their own storage through dedicated interfaces.

Purpose:

  • To store provider-specific data persistently.
  • To decouple the provider's logic from the specific storage backend (e.g., PostgreSQL, Redis, in-memory).
  • To facilitate easier testing by allowing mock storage implementations.

Mechanism:

  1. Define Storage Interface: Within the provider's package, define a Go interface (e.g., Store) that specifies the data access methods required by the provider (e.g., SaveItem, GetItem, DeleteItem).
  2. Implement Interface: Create one or more concrete implementations of the storage interface. A common implementation uses PostgreSQL, often utilizing the database/sql package or libraries like pgx.
  3. Dependency Injection: The provider's adapter struct should hold an instance of the storage interface. This instance (e.g., *PgStore) is typically created during server initialization (using the shared database connection pool) and injected into the provider's constructor.
  4. Usage: The provider's methods interact with the database solely through the defined storage interface methods.

Implementation Example:

  • Define Interface (store.go):
    package myprovider
    
    import "context"
    
    // Store defines the storage interface for MyProvider
    type Store interface {
        SaveItem(ctx context.Context, item Item) error
        GetItem(ctx context.Context, id string) (*Item, error)
        // ... other methods
    }
    
    // Item represents a data item
    type Item struct {
        ID   string
        Data map[string]interface{}
        // ... other fields
    }
    
  • Implement with PostgreSQL (pg_store.go):
    package myprovider
    
    import (
        "context"
        "database/sql" // Or pgxpool.Pool
        "encoding/json"
        "fmt"
    )
    
    var _ Store = (*PgStore)(nil) // Compile-time check
    
    type PgStore struct {
        db *sql.DB // Or *pgxpool.Pool
    }
    
    func NewPgStore(db *sql.DB) *PgStore {
        return &PgStore{db: db}
    }
    
    func (s *PgStore) SaveItem(ctx context.Context, item Item) error {
        dataJSON, err := json.Marshal(item.Data)
        if err != nil { /* handle error */ }
    
        _, err = s.db.ExecContext(ctx,
            `INSERT INTO my_provider_items (id, data) VALUES ($1, $2) 
             ON CONFLICT (id) DO UPDATE SET data = $2`,
            item.ID, dataJSON)
        // ... handle error ...
        return err
    }
    
    func (s *PgStore) GetItem(ctx context.Context, id string) (*Item, error) {
        // ... query logic using s.db.QueryRowContext ...
    }
    // ... other methods
    
  • Inject into Adapter (adapter.go):
    package myprovider
    
    import (
        "context"
        "database/sql"
    )
    
    type MyProvider struct {
        store Store // Use the interface type
        // ... other fields
    }
    
    // Constructor receives the concrete store implementation
    func NewProvider(db *sql.DB) *MyProvider {
        store := NewPgStore(db) // Create the concrete store
        return &MyProvider{
            store: store, // Inject it
        }
    }
    
    func (p *MyProvider) SomeOperation(ctx context.Context, itemID string) error {
        // Use the store via the interface
        item, err := p.store.GetItem(ctx, itemID)
        // ...
    }
    

Database Schema:

Providers requiring database storage are responsible for defining their necessary SQL table schemas. These migrations should be managed as part of the overall SLOP Server database migration strategy.

By abstracting storage operations behind interfaces, providers remain modular and adaptable to different deployment environments or changes in storage technology.

Configuration

Providers often require configuration settings to function correctly, such as API keys, service endpoints, timeouts, or feature flags. The SLOP Server provides mechanisms for providers to load and manage these settings.

Purpose:

  • To allow customization of provider behavior without modifying code.
  • To securely manage sensitive information like API keys or credentials.
  • To adapt provider functionality to different environments (development, staging, production).

Common Approaches:

  1. Environment Variables: The most common and recommended method, especially for sensitive data or settings that vary across environments. Environment variables are easily managed in containerized deployments and CI/CD pipelines.
  2. Configuration Files: Less common for individual providers in the SLOP context, as the main server configuration might handle broader settings. If used, providers might load specific sections from a central configuration file managed by the server.
  3. Database: For dynamic configuration that needs to be updated without restarting the server (though this is less typical for initial setup parameters like API keys).

Implementation Strategy:

  1. Define Config Struct: Create a Go struct within the provider package to hold all its configuration parameters.
  2. Loading Function: Implement a function (e.g., LoadConfig()) that reads values from the environment (or other sources) and populates the config struct. This function should handle defaults and validation.
  3. Initialization: Call the LoadConfig() function during the provider's initialization phase (e.g., within the NewProvider constructor or the Initialize method). The loaded config struct can then be stored within the provider's adapter struct.

Implementation Example:

  • Define Config Struct (config.go):
    package myprovider
    
    import (
        "fmt"
        "os"
        "strconv"
        "time"
    )
    
    // Config holds the provider configuration
    type Config struct {
        APIKey      string
        APIEndpoint string
        Timeout     time.Duration
        MaxRetries  int
    }
    
    // LoadConfig loads configuration from environment variables
    func LoadConfig() (*Config, error) {
        apiKey := os.Getenv("MY_PROVIDER_API_KEY")
        if apiKey == "" {
            return nil, fmt.Errorf("MY_PROVIDER_API_KEY environment variable is required")
        }
    
        endpoint := os.Getenv("MY_PROVIDER_API_ENDPOINT")
        if endpoint == "" {
            endpoint = "https://api.example.com/v1" // Default value
        }
    
        timeoutStr := os.Getenv("MY_PROVIDER_TIMEOUT_SECONDS")
        timeout := 30 * time.Second // Default value
        if timeoutStr != "" {
            timeoutSec, err := strconv.Atoi(timeoutStr)
            if err != nil {
                return nil, fmt.Errorf("invalid MY_PROVIDER_TIMEOUT_SECONDS: %w", err)
            }
            timeout = time.Duration(timeoutSec) * time.Second
        }
    
        retriesStr := os.Getenv("MY_PROVIDER_MAX_RETRIES")
        maxRetries := 3 // Default value
        if retriesStr != "" {
            var err error
            maxRetries, err = strconv.Atoi(retriesStr)
            if err != nil || maxRetries < 0 {
                return nil, fmt.Errorf("invalid MY_PROVIDER_MAX_RETRIES: %s", retriesStr)
            }
        }
    
        return &Config{
            APIKey:      apiKey,
            APIEndpoint: endpoint,
            Timeout:     timeout,
            MaxRetries:  maxRetries,
        }, nil
    }
    
  • Use in Provider (adapter.go):
    package myprovider
    
    import (
        "context"
        "log"
    )
    
    type MyProvider struct {
        config *Config
        // ... other fields like store, notifier, etc.
    }
    
    func NewProvider(/* dependencies */) (*MyProvider, error) {
        cfg, err := LoadConfig()
        if err != nil {
            return nil, fmt.Errorf("failed to load my_provider config: %w", err)
        }
        log.Println("MyProvider configuration loaded successfully.")
    
        return &MyProvider{
            config: cfg,
            // ... initialize other fields
        }, nil
    }
    
    func (p *MyProvider) Initialize(ctx context.Context) error {
        // Use config values if needed during initialization
        log.Printf("Initializing MyProvider with endpoint: %s", p.config.APIEndpoint)
        // ...
        return nil
    }
    
    // Other methods can access config via p.config
    

Best Practices:

  • Environment Variables: Prefer environment variables for configuration, especially for secrets.
  • Defaults: Provide sensible default values for non-critical settings.
  • Clear Naming: Use clear and specific names for environment variables (e.g., PROVIDERNAME_SETTING_NAME).
  • Validation: Validate configuration values during loading to fail fast if settings are invalid.
  • Documentation: Clearly document all required and optional configuration settings for the provider.

Provider Chaining

Provider chaining is a powerful pattern within the SLOP Server that allows providers to collaborate by passing information sequentially. An upstream provider might generate an event (often triggered by an external webhook or internal process), which is then consumed by a downstream provider. The downstream provider processes the data and may, in turn, emit its own events for further consumption, creating a processing pipeline.

Purpose:

  • To build complex workflows by composing modular, single-purpose providers.
  • To decouple different stages of a process (e.g., data ingestion, transformation, notification).
  • To promote reusability of provider functionalities.

Mechanism:

Provider chaining typically relies on the interplay of Webhooks, Events, and Jobs:

  1. Initiation: An external event triggers a webhook handler in an upstream provider (Provider A), or an internal process in Provider A completes.
  2. Event Publication: Provider A publishes an internal event via the Pub/Sub system, containing relevant data or references (e.g., a resource ID).
  3. Event Consumption/Trigger:
    • A dedicated listener or an Event Trigger configured in the system detects the event published by Provider A.
    • This trigger schedules a background job for a downstream provider (Provider B).
  4. Job Execution: Provider B's job handler executes.
  5. Data Fetching (if needed): Provider B might need to fetch additional data, potentially by executing a tool exposed by Provider A (using the Provider Registry) or by accessing a shared resource (using the Resource Manager).
  6. Processing: Provider B performs its specific processing or transformation task.
  7. Further Publication (Optional): Provider B might publish its own event upon completion, allowing further chaining with Provider C, and so on.

Example: Unipile -> Email Draft Preparator

This chain processes incoming emails:

  1. Unipile (Provider A):
    • Receives a webhook from an external email service (e.g., Gmail) indicating a new email.
    • The webhook handler stores the raw payload as a resource.
    • Publishes an internal event unipile.email.received containing references like the account ID, email ID, and the resource ID of the raw payload.
  2. Event Trigger:
    • An event trigger is configured to listen for unipile.email.received.
    • When the event occurs, the trigger schedules a job of type email_draft_preparator.process_email.
  3. Email Draft Preparator (Provider B):
    • The process_email job handler executes.
    • It extracts the email ID and account ID from the job parameters (originating from the event payload).
    • It executes the unipile.get_email tool (belonging to Provider A) to fetch the full email details.
    • It processes the email content, potentially converting it to XML or another format.
    • It stores the processed result (e.g., as a new resource).
    • It publishes a new event, such as email.draft.prepared, containing the ID of the processed resource.

Provider Chaining Flow Diagram:

graph LR
    A[External Event Source] --> B(Provider A: Webhook/Process);
    B -- Publishes Event --> C{Internal Pub/Sub};
    C -- Event Notification --> D(Event Listener / Trigger);
    D -- Schedules Job --> E{Job Manager};
    E -- Executes Job --> F(Provider B: Job Handler);
    F -- Optional: Executes Tool --> G(Provider A: Tool Execution);
    F -- Processes Data --> F;
    F -- Optional: Publishes New Event --> C;

    subgraph "Example: Unipile -> Email Preparator"
        direction LR
        ExtEmail[External Email Service] --> UnipileWebhook{Unipile: Webhook Handler};
        UnipileWebhook -- Publishes "unipile.email.received" --> PubSubBus[Pub/Sub];
        PubSubBus -- Notifies --> Trigger(Event Trigger);
        Trigger -- Schedules Job --> JobMgr{Job Manager};
        JobMgr -- Executes Job --> EmailPrepJob(Email Preparator: process_email Job);
        EmailPrepJob -- Executes "unipile.get_email" --> UnipileTool{Unipile: Tool Execution};
        EmailPrepJob -- Processes Email --> EmailPrepJob;
        EmailPrepJob -- Publishes "email.draft.prepared" --> PubSubBus;
    end

This pattern, leveraging the core Pub/Sub and Job Manager systems while respecting Provider Autonomy, enables sophisticated, maintainable workflows within the SLOP Server.

Provider Descriptions (New Feature)

To aid developers and potentially LLMs in understanding and utilizing providers effectively, each provider's reference section should include a standardized description block. This block provides essential context beyond the basic tool and event listings.

Purpose:

  • To offer a quick overview of the provider's function and scope.
  • To clearly state any prerequisites needed before the provider can be used.
  • To explain the authentication mechanisms required, if any.
  • To illustrate typical usage patterns through common scenarios.
  • To highlight known limitations or important considerations.

Standard Structure:

Each provider's reference section (under Provider Reference) should contain a subsection formatted as follows:

*   **Overview:** A brief, one or two-sentence summary of the provider's main purpose.
*   **Description:**
    *   **Prerequisites:** List any setup steps, external accounts, configurations (e.g., specific environment variables beyond basic API keys), or other providers that must be in place before this provider can function correctly.
    *   **Authentication:** Detail how the provider authenticates with external services (e.g., API Keys, OAuth2 flow initiated via a specific tool, credentials stored in config). Explain if user interaction is required for setup.
    *   **Common Use Cases / Scenarios:** Describe 2-3 typical ways the provider is used. For each scenario:
        *   Provide a brief narrative description.
        *   Include a sequence of API calls (using the SLOP Server API, e.g., `POST /api/tools/{tool_id}`) demonstrating the scenario. Show example JSON request bodies for the tool calls.
        *   Focus on the interaction with the SLOP server's tools, *not* the direct API calls to the underlying external service.
    *   **Limitations:** Mention any known constraints, edge cases, or aspects the provider does not cover (e.g., specific data types not supported, rate limits inherited from external services, features not yet implemented).

Example Snippet (Conceptual for a hypothetical 'Calendar' provider):

### Calendar (`calendar`)
*   **Overview:** Provides tools to interact with users' external calendar services (e.g., Google Calendar, Outlook Calendar).
*   **Description:**
    *   **Prerequisites:** Requires the `unipile` provider to be configured and the user to have connected their calendar account via Unipile.
    *   **Authentication:** Relies on the authentication tokens managed by the `unipile` provider. No separate authentication setup is needed specifically for the `calendar` provider itself, but the underlying Unipile connection must be active.
    *   **Common Use Cases / Scenarios:**
        1.  **List Upcoming Events:** Retrieve a list of the user's upcoming calendar events.
            *   Narrative: A user wants to see their schedule for the next day.
            *   API Call Sequence:
                ```
                POST /api/tools/calendar.list_events
                {
                  "account_id": "acc_12345", // Obtained via unipile.list_accounts
                  "start_time": "2025-05-04T00:00:00Z",
                  "end_time": "2025-05-05T00:00:00Z"
                }
                ```
        2.  **Create a New Event:** Add a new event to the user's calendar.
            *   Narrative: Schedule a meeting based on a user request.
            *   API Call Sequence:
                ```
                POST /api/tools/calendar.create_event
                {
                  "account_id": "acc_12345",
                  "summary": "Project Sync Meeting",
                  "start_time": "2025-05-06T14:00:00Z",
                  "end_time": "2025-05-06T15:00:00Z",
                  "attendees": ["user@example.com", "colleague@example.com"]
                }
                ```
    *   **Limitations:** Does not currently support managing recurring events or calendar sharing settings. Relies entirely on the capabilities exposed by the underlying Unipile connection.
*   **Tools:**
    *   `calendar.list_events`: ...
    *   `calendar.create_event`: ...
*   **Webhooks:** None.
*   **Events:**
    *   `calendar.event.created`: Published after a new event is successfully created via the tool.
*   **Jobs:** None.
*   **Configuration:** None (relies on Unipile configuration).
*   **Notes:** Ensure the Unipile account sync is up-to-date for accurate calendar information.

This standardized description block aims to significantly improve the usability and discoverability of provider functionalities. ```