Skip to content

Core Concepts for Providers

Provider Architecture

Providers in the SLOP Server are designed as pluggable, autonomous modules that extend the server's core functionality. They act as bridges to external services or implement specific internal capabilities.

Key Characteristics:

  • Modularity: Each provider encapsulates a specific domain or service integration.
  • Autonomy: Providers manage their own resources and state with minimal dependence on other providers (See Provider Autonomy).
  • Registration: Providers register themselves with the SLOP server during initialization, making their capabilities discoverable.
  • Interface Implementation: Providers must implement the providers.Provider interface defined in internal/providers/interfaces.go.
  • Capabilities: Providers expose functionalities through:
    • Tools: Discrete functions callable via the API or by LLMs.
    • Webhooks: Endpoints to receive notifications from external services.
    • Events: Publishing internal messages via a Pub/Sub system.
    • Jobs: Defining and executing background tasks.

Typical Structure:

A provider package (pkg/providers/<provider_name>/) usually contains:

  • adapter.go: The core implementation of the providers.Provider interface, handling registration and interaction with the SLOP server.
  • client.go: (Optional) Code for interacting with external APIs or services.
  • models/: (Optional) Go structs defining data structures specific to the provider.
  • services/: (Optional) Define all services with the tools capabilities associated to each one of them .
  • operations.go / services/: (Optional) Implementation of the provider's core logic and tool functions.
  • *_webhook_handler.go: (Optional) Handlers for incoming webhooks.
  • *_job.go: (Optional) Definitions and handlers for background jobs.
  • store.go / pg_store.go: (Optional) Interface and implementation for provider-specific data persistence.
  • config.go: (Optional) Loading and managing provider-specific configuration.
graph TD
    subgraph SLOP Server Core
        direction LR
        A[API Layer] --> B{Provider Registry};
        B --> C[Tool Registry];
        B --> D[Webhook Registry];
        B --> E[Job Manager];
        B --> F[Pub/Sub System];
        B --> G[Resource Manager];
    end

    subgraph Provider Module (pkg/providers/my_provider)
        direction TB
        H(adapter.go) -- Implements --> I{providers.Provider Interface};
        H -- Registers --> C;
        H -- Registers --> D;
        H -- Registers --> E;
        H -- Publishes --> F;
        H -- Uses --> G;
        H -- Uses --> J[client.go];
        H -- Uses --> K[operations/services];
        H -- Uses --> L[store.go];
        H -- Uses --> M[config.go];
        J -- Interacts --> N[External Service API];
    end

    I -- Defines Methods --> H;
    A -- Executes Tool --> H;
    N -- Sends Webhook --> D;
    F -- Notifies --> E; 

This architecture promotes separation of concerns and allows the SLOP server to be extended without modifying its core components.*

Choose a unique_name following the go convention,so package naming conventions follow specific rules.

For your pkg/provider/ path: Package names should be:

All lowercase
A single word (no underscores or hyphens)
Short and concise, but clear and descriptive
Not containing spaces or special characters

Valid characters for Go package names include:

Lowercase letters (a-z)
Numbers (0-9), but the name cannot start with a number
The underscore (_) is technically allowed but discouraged in standard Go style

Examples of valid package names for your structure:

pkg/provider/aws
pkg/provider/gcp
pkg/provider/azure
pkg/provider/mongo
pkg/provider/redis
pkg/provider/postgres
pkg/provider/auth0

Examples of invalid package names:

pkg/provider/AWS (uppercase not allowed)
pkg/provider/aws-client (hyphens not allowed)
pkg/provider/aws_client (underscores technically allowed but discouraged)
pkg/provider/aws.client (periods not allowed)
pkg/provider/1redis (cannot start with a number)

Provider Autonomy

Maintaining autonomy between providers is a core principle in the SLOP Server architecture. This ensures that providers are decoupled, modular, and easier to maintain and update independently.

Guideline: Provider packages (e.g., pkg/providers/unipile) MUST remain autonomous and decoupled from one another.

Core Principles:

  1. When modifying(editing/creating) a provider package pkg/providers/A , you can only modify the directory pkg/providers/A. You cannot modify outside that scope especialy internal/. You must use the existing interfaces and structure provided
  2. No Direct Cross-Provider Imports: A provider package (e.g., pkg/providers/A) MUST NOT directly import another specific provider package (e.g., pkg/providers/B) except models pkg/providers/B/models. Within a package pkg/providers/B/models cannot import any pkg/providers/B/ or pkg/providers/...
  3. No Direct Cross-Provider Calls: A provider package MUST NOT directly call functions or methods defined in another specific provider package.
  4. Communication via Abstraction: Interaction between providers or triggering actions in one provider based on an event in another MUST occur through abstracted mechanisms:
    • Pub/Sub: Providers should publish generic events (e.g., providerA.entity.created) to a central pub/sub system. Other components or dedicated listeners (potentially outside the provider packages) can subscribe to these events and trigger subsequent actions (like creating a job for another provider).
    • Job Manager: Workflow orchestration should be handled by creating jobs via the central jobmanager, triggered by event listeners or external orchestrators, not directly by another provider.
    • Resource Manager: Providers can interact with shared data by creating/reading resources via the central resources manager. Event payloads can reference resource IDs.
  5. Shared Models/Interfaces: Providers MAY import shared, generic packages like internal core types, interfaces (internal/providers, internal/core, internal/jobmanager, internal/resources, internal/pubsub), or common model definitions if necessary, but NOT specific implementations of other providers.

Rationale:

  • Reduced Complexity: Prevents tangled dependencies and makes the system easier to understand.
  • Improved Maintainability: Changes within one provider are less likely to break others.
  • Enhanced Testability: Providers can be tested in isolation more effectively.
  • Flexibility: Allows providers to be added, removed, or replaced with minimal impact on the rest of the system.

Example Violation (Incorrect):

// pkg/providers/providerA/adapter.go
package providerA

import (
    // ... other imports
    providerB "gitlab.com/webigniter/slop-server/pkg/providers/providerB" // <-- VIOLATION: Direct import
    // valid
   BModels "gitlab.com/webigniter/slop-server/pkg/providers/providerB/models" // <-- VALID only if models are a standalone package
)

func (a *AdapterA) HandleSomeEvent(...) error {
    // ... process event ...
    // Directly call a function in Provider B
    err := providerB.DoSomethingSpecific(...) // <-- VIOLATION: Direct call
    // ...
}

Example Correct Implementation (Using Pub/Sub and Jobs):

// pkg/providers/providerA/adapter.go
package providerA

import (
    "gitlab.com/webigniter/slop-server/internal/pubsub"
    // NO import for providerB
)

const EventProviderACompleted = "providerA.task.completed"

func (a *AdapterA) HandleSomeEvent(ctx context.Context, ...) error {
    // ... process event ...
    resultData := map[string]interface{}{ "key": "value" }

    // Publish a generic event
    if err := a.notifier.Publish(ctx, EventProviderACompleted, resultData); err != nil {
         log.Printf("ERROR publishing event %s: %v", EventProviderACompleted, err)
    }
    // DO NOT call Provider B directly.
    return nil
}

// --- Elsewhere (e.g., internal/listeners/listeners.go or via Event Trigger Config) ---

import (
    "gitlab.com/webigniter/slop-server/internal/jobmanager"
    // Potentially import providerB's job registration details if needed
)

func handleProviderACompletion(ctx context.Context, eventPayload map[string]interface{}) {
    // Extract necessary info from eventPayload
    // Get Job Manager instance (dependency injection)
    jobMgr := getJobManagerFromContext(ctx)

    // Schedule a job defined by Provider B
    jobParams := map[string]interface{}{ /* derived from eventPayload */ }
    job := jobmanager.Job{ Type: "providerB.process_data", Parameters: jobParams }
    _, err := jobMgr.ScheduleJob(ctx, job)
    if err != nil { /* ... handle error ... */ }
}

By adhering to these principles, the SLOP Server maintains a clean, scalable, and robust architecture.

Provider Registration

For a provider to be recognized and utilized by the SLOP Server, it must be registered during the server's initialization phase. This process makes the provider's capabilities (tools, webhooks, jobs, events) available to the rest of the system.

Mechanism:

  1. Implementation: The provider must implement the providers.Provider interface.
  2. Instantiation: A factory function (e.g., NewProvider()) is typically defined within the provider's package to create an instance of the provider's main struct (adapter).
  3. Registration Call: The core of the registration happens in the pkg/providers/providers.go file. An init() function within this package calls providers.RegisterProvider() for each provider that should be included in the server build.

Steps for Developers:

  1. Create Provider Instance: Ensure your provider package has a constructor function (e.g., func NewMyProvider() *MyProvider).
  2. Special client providers pkg/providers/client. the client providers scope declare special tools use by the client internaly by managed (called like a tool within a function call) but instead of calling the SLOP server api like any other tools , it call a local function like ask_user to interfact with the user halt a workflow for more user information

Initialization Sequence:

When the SLOP server starts:

  1. the Server run GetAllProviders() the initiliaze all providers.
  2. The central providers.Registry stores instances of each registered provider.
  3. During server setup, the core system iterates through the registered providers:
    • Calls the Initialize() method on each provider.
    • Calls RegisterTools(), RegisterWebhooks(), RegisterJobs(), RegisterEventDefinitions() (if implemented) on each provider, passing the respective registries (ToolRegistry, WebhookRegistry, JobRegistry, EventRegistry).

This ensures that all provider capabilities are properly configured and made available before the server starts accepting requests.

Tools

Tools are discrete functions exposed by providers that allow users, LLMs, or other system components to interact with the provider's capabilities. They form a primary mechanism for extending SLOP server functionality.

Purpose:

  • To provide specific, callable actions related to the provider's domain (e.g., unipile.get_email, filesystem.write_file).
  • To enable integration with LLMs, which can select and execute tools based on user requests.
  • To offer a structured way to interact with provider services via the SLOP API.

Definition:

Tools are defined within the provider using the providers.Tool struct. Key fields include:

  • ID: A unique identifier for the tool, typically following the format {ProviderName}.{tool_name} (e.g., my_provider.example_tool).
  • Name: A human-readable name for the tool.
  • Description: A clear explanation of what the tool does, intended for both developers and LLMs.
  • Provider: The name of the provider offering this tool.
  • Parameters: Holds the definition of the parameters the tool accepts. This field is defined in the providers.ProviderCapability struct as map[string]providers.Parameter. Each key is the parameter name, and the value is a providers.Parameter struct detailing its type, description, requirement status, etc. The provider constructs this map within its GetCapabilities() method.

Tool Parameters - Actual Structure:

The Parameters field is defined as map[string]providers.Parameter within the providers.ProviderCapability struct. This map holds the definitions for each parameter the tool accepts. The provider constructs this map within its GetCapabilities() method for each capability.

Example Parameter Definition (using map[string]providers.Parameter):

When defining a capability internally, the provider constructs the Parameters map like this, using the providers.Parameter struct:

// Example within a provider's GetCapabilities() method
func (s *SomeService) GetCapabilities() []providers.ProviderCapability {
    return []providers.ProviderCapability{
        {
            Name:        "example_tool",
            Description: "An example tool description.",
            Category:    "Example Category",
            Parameters: map[string]providers.Parameter{
                "param1": {
                    Type:        "string",
                    Description: "Description of the first parameter.",
                    Required:    true,
                    Category:    "path", // or "query", "body", etc.
                },
                "param2": {
                    Type:        "integer",
                    Description: "Description of the second parameter (optional).",
                    Required:    false,
                    Category:    "query",
                },
                // Add more parameters as needed
            },
            // Examples: ... (Optional)
        },
        // ... other capabilities
    }
}
}, }, "required": []string{"param1"}, }

This map (or a similar structure) is then assigned to the Parameters field of the providers.Tool struct when the tool is registered.

Registration:

Providers register their tools with the central ToolRegistry during the server initialization sequence by implementing the RegisterTools method:

// Example Registration using the actual pattern (based on unipile)
func (p *MyProvider) RegisterTools(registry providers.ToolRegistry) error {
    // Define the parameter schema (e.g., as a map)
    exampleToolParams := map[string]interface{}{
        "type": "object",
        "properties": map[string]interface{}{
            "param1": map[string]interface{}{
                "type":        "string",
                "description": "First parameter",
            },
        },
        "required": []string{"param1"},
    }

    // Register the tool, assigning the schema map to the Parameters field
    err := registry.RegisterTool(providers.Tool{
        ID:          fmt.Sprintf("%s.example_tool", ProviderName),
        Name:        "Example Tool",
        Description: "An example tool demonstrating correct parameter definition.",
        Provider:    ProviderName,
        Parameters:  exampleToolParams, // Assign the map directly
        // Examples: []interface{}{...} // Optionally add examples
    })
    if err != nil {
        return fmt.Errorf("failed to register tool: %w", err)
    }
    // ... register other tools ...
    return nil
}

Execution:

Tool execution is handled by the provider's ExecuteTool method. The SLOP server routes execution requests to the appropriate provider based on the tool ID.

// Example from Provider Development Guide
func (p *MyProvider) ExecuteTool(ctx context.Context, toolID string, params map[string]interface{}) (interface{}, error) {
    switch toolID {
    case fmt.Sprintf("%s.example_tool", ProviderName):
        // Extract and validate parameters from the 'params' map
        param1, ok := params["param1"].(string)
        if !ok { /* handle error */ }

        // ... implement tool logic ...
        result := map[string]interface{}{ "message": "Success!" }
        return result, nil
    default:
        return nil, fmt.Errorf("unknown tool ID: %s", toolID)
    }
}

API Access:

  • Discovery (Public):
    • GET /tools: Lists all registered tools.
    • GET /tools/:tool_id: Retrieves the detailed definition (including parameters/schema) of a specific tool.
  • Execution (Private - Requires Auth):
    • POST /api/tools/:tool_id: Executes the specified tool with provided arguments in the request body.

Tools are fundamental building blocks for provider functionality, enabling both automated processes and interactive use cases within the SLOP ecosystem.

Webhooks (Incoming)

Webhooks provide a mechanism for external services to send asynchronous notifications or data to the SLOP Server. Providers can register specific endpoints to listen for these incoming requests.

Purpose:

  • To allow external systems (e.g., SaaS platforms, Git repositories, payment gateways) to push information to SLOP in real-time or near real-time.
  • To trigger internal workflows or data processing based on external events.

Mechanism:

  1. External Service Configuration: The external service is configured to send HTTP POST requests to a specific URL provided by the SLOP server.
  2. Provider Implementation: The provider implements the providers.WebhookRegisteringProvider interface.
  3. Registration: During initialization, the provider registers a specific path (e.g., /webhooks/{provider}/{webhook_id}) and a corresponding handler function using the providers.WebhookRegistry.
  4. Handling: When an HTTP request hits a registered webhook path, the SLOP server routes it to the provider's designated handler function.
  5. Processing: The handler function is responsible for:
    • Reading and parsing the request body (often JSON, but can vary).
    • Validating the payload (and potentially verifying signatures if implemented).
    • Performing minimal immediate processing.
    • Crucially: For any non-trivial processing, the handler should schedule a background job using the JobManager to avoid blocking the webhook response and ensure reliability. The job can be passed necessary information (like the payload or a reference to it).
    • Returning an appropriate HTTP status code quickly (e.g., 200 OK for immediate success, 202 Accepted if a job was scheduled).

Implementation (WebhookRegisteringProvider Interface): TODO to remove

Webhook Flow Diagram:

sequenceDiagram
    participant ExternalService
    participant SLOP_Server
    participant ProviderWebhookHandler
    participant JobManager
    participant BackgroundJob

    ExternalService->>+SLOP_Server: POST /webhooks/{provider}/{webhook_id} (Payload)
    SLOP_Server->>+ProviderWebhookHandler: Invoke handler(request)
    ProviderWebhookHandler->>ProviderWebhookHandler: Validate/Parse Payload
    alt Minimal Processing (Not Recommended for complex tasks)
        ProviderWebhookHandler-->>-SLOP_Server: 2xx OK Response
    else Background Processing Needed (Recommended)
        ProviderWebhookHandler->>+JobManager: ScheduleJob(JobType, Payload/PayloadRef)
        JobManager-->>-ProviderWebhookHandler: JobID
        ProviderWebhookHandler-->>-SLOP_Server: 202 Accepted (JobID)
        JobManager->>+BackgroundJob: Execute Job(Payload/PayloadRef)
        BackgroundJob->>BackgroundJob: Perform processing
        BackgroundJob-->>-JobManager: Job Complete/Failed
    end
    SLOP_Server-->>-ExternalService: Response (2xx)

API Access:

  • POST /webhooks/:provider/:webhook_id: The public endpoint where external services send webhook data. The exact path is determined during provider registration.

Webhooks are essential for integrating SLOP with external systems that initiate communication.

sequenceDiagram
    participant ExternalService
    participant SLOP_Server
    participant ProviderWebhookHandler
    participant JobManager
    participant BackgroundJob

    ExternalService->>+SLOP_Server: POST /webhooks/{provider}/{webhook_id} (Payload)
    SLOP_Server->>+ProviderWebhookHandler: Invoke handler(request)
    ProviderWebhookHandler->>ProviderWebhookHandler: Validate/Parse Payload
    alt Minimal Processing
        ProviderWebhookHandler-->>-SLOP_Server: 2xx OK Response
    else Background Processing Needed
        ProviderWebhookHandler->>+JobManager: ScheduleJob(JobType, PayloadRef)
        JobManager-->>-ProviderWebhookHandler: JobID
        ProviderWebhookHandler-->>-SLOP_Server: 202 Accepted (JobID)
        JobManager->>+BackgroundJob: Execute Job(PayloadRef)
        BackgroundJob->>BackgroundJob: Perform processing
        BackgroundJob-->>-JobManager: Job Complete/Failed
    end
    SLOP_Server-->>-ExternalService: Response (2xx)

Events (Internal Pub/Sub)

Beyond receiving external webhooks, providers can publish internal events using the SLOP Server's Pub/Sub (Publish/Subscribe) system. This mechanism facilitates decoupled communication between different parts of the server, including between providers or between providers and core services.

Purpose:

  • To notify other components about significant occurrences within a provider (e.g., unipile.email.received, myprovider.document.processed).
  • To trigger subsequent actions or workflows in a decoupled manner (See Provider Chaining).
  • To enable internal automation via Event Triggers.
  • To allow external clients to subscribe to server events via Webhook Subscriptions (though the client interacts with the subscription API, not the internal Pub/Sub directly).

Mechanism:

The internal Pub/Sub system relies on the pubsub.Publisher interface (defined in internal/pubsub/interface.go). Providers that need to publish events should receive an implementation of this interface via dependency injection during their initialization.

Workflow for Publishing Events:

  1. Dependency: Ensure your provider's adapter struct includes a field for pubsub.Publisher and receives it in its constructor.
    // Example Adapter Struct
    type MyProviderAdapter struct {
        // ... other fields
        notifier pubsub.Publisher
    }
    
    // Example Constructor
    func NewMyProviderAdapter(notifier pubsub.Publisher /*, ... other deps */) *MyProviderAdapter {
        return &MyProviderAdapter{ notifier: notifier /*, ... */}
    }
    
  2. Define Event Channel: Choose a unique, descriptive channel name, typically following the pattern provider_name.resource_type.action (e.g., unipile.account.sync_status.updated, email_draft_preparator.draft.prepared).
  3. Define Payload Struct: Create a Go struct representing the data for the event. This struct should be serializable to JSON and contain all necessary information for consumers.
    // Example Payload Struct
    type MyProviderDocumentProcessedEvent struct {
        DocumentID      string            `json:"documentId"`
        Status          string            `json:"status"`
        ProcessedFileURL string            `json:"processedFileUrl,omitempty"`
        Metadata        map[string]string `json:"metadata,omitempty"`
        Timestamp       time.Time         `json:"timestamp"`
    }
    
  4. Publish: Call the notifier.Publish(ctx, channelName, payloadStruct) method where the event occurs.
    func (a *MyProviderAdapter) completeProcessing(ctx context.Context, docID string, url string) error {
        // ... processing logic ...
    
        eventPayload := MyProviderDocumentProcessedEvent{
            DocumentID:      docID,
            Status:          "completed",
            ProcessedFileURL: url,
            Timestamp:       time.Now(),
        }
        channel := "myprovider.document.processed"
    
        log.Printf("Publishing event '%s' for document %s", channel, docID)
        err := a.notifier.Publish(ctx, channel, eventPayload)
        if err != nil {
            log.Printf("ERROR publishing event '%s': %v", channel, err)
            // Decide how to handle publish errors (retry, log, ignore?)
        }
        return err // Or nil depending on criticality
    }
    

Payload Handling:

  • Serialization: The payload struct passed to Publish must be marshallable to JSON using encoding/json.
  • Large Payloads: The system (specifically the internal/pubsub/postgres implementation) automatically handles large payloads. If the marshalled JSON exceeds a certain size (~7.5KB), it's transparently stored using the resources.Manager, and only a resource reference is published internally. Providers do not need to manually check payload size before publishing.

Event Discovery:

To make events discoverable and usable by API clients (for webhook subscriptions or triggers), providers should implement the providers.EventInfoProvider interface.

// Interface definition (internal/providers/interfaces.go)
type EventInfoProvider interface {
    // GetEventDefinitions returns basic info about published events.
    GetEventDefinitions() []EventDefinition
    // GetDetailedEventInfo returns detailed info (e.g., schema, examples) about events.
    GetDetailedEventInfo() []DetailedEventInfo
}

// EventDefinition describes an event published by a provider.
type EventDefinition struct {
    Channel        string      `json:"channel"`         // Unique event channel name (e.g., provider.resource.action)
    Description    string      `json:"description"`     // User-friendly description of the event
    SourceProvider string      `json:"source_provider"` // ID of the provider publishing the event
    PayloadSchema  interface{} `json:"payload_schema,omitempty"` // Optional: JSON Schema describing the payload structure
}

// DetailedEventInfo provides extended information about an event.
type DetailedEventInfo struct {
    Channel        string      `json:"channel"`
    Description    string      `json:"description"`
    SourceProvider string      `json:"source_provider"`
    PayloadSchema  interface{} `json:"payload_schema,omitempty"`
    // ExamplePayload interface{} `json:"example_payload,omitempty"` // Optional: Example payload
    // Notes          string      `json:"notes,omitempty"`           // Optional: Additional notes
}

// Example Provider Implementation (adapter.go)
var _ providers.EventInfoProvider = (*MyProvider)(nil)

func (p *MyProvider) GetEventDefinitions() []providers.EventDefinition {
    return []providers.EventDefinition{
        {
            Channel:        "myprovider.document.processed",
            Description:    "Triggered when a document has finished processing.",
            SourceProvider: ProviderName,
            // Optionally provide a JSON schema for the payload struct
        },
        // ... other events published by this provider
    }
}

// Example implementation for detailed info (can return nil if not needed)
func (p *MyProvider) GetDetailedEventInfo() []providers.DetailedEventInfo {
    // Return nil or populate detailed info based on EventDefinitions
    return nil
}

Registered event definitions can be queried via the authenticated endpoint: * GET /api/events/definitions // TODO: Confirm endpoint or if it uses detailed info

Internal events are a powerful tool for building reactive and decoupled workflows within the SLOP Server.

Jobs (Background Processing)

For operations that might take a significant amount of time (e.g., processing large files, interacting with slow external APIs, complex computations), providers should utilize the SLOP Server's Job Manager to execute these tasks asynchronously in the background.

Purpose:

  • Prevent Blocking: Avoid tying up API request handlers or webhook listeners while long operations complete.
  • Improve Responsiveness: Allow the server to respond quickly to initial requests (e.g., with a 202 Accepted status and a Job ID).
  • Enhance Reliability: Jobs can be queued, managed, and potentially retried if they fail.
  • Resource Management: Allows the server to manage the execution of resource-intensive tasks more effectively.

Mechanism:

The system relies on a central jobmanager.JobManager and associated interfaces:

  • jobmanager.JobManager: Interface for scheduling and managing jobs.
  • jobmanager.JobRegistry: Interface used during initialization to register job types and their corresponding handler functions.
  • providers.JobRegisteringProvider: Interface that providers implement to register their job types.
  • jobmanager.Job: Struct representing a job instance, containing its type, parameters, schedule, status, etc.

Workflow:

  1. Define Job Type: Choose a unique string identifier for the job type, typically provider_name.job_action (e.g., my_provider.process_data, email_draft_preparator.process_email).
  2. Implement Handler: Create a function within the provider that performs the actual work. This function must match the signature func(ctx context.Context, params map[string]interface{}) error.
  3. Register Job Type: Implement the providers.JobRegisteringProvider interface and use the RegisterJobs method to associate the job type string with its handler function via the JobRegistry.
  4. Schedule Job: When a long-running task needs to be performed (often triggered by an API call or a webhook handler), obtain an instance of the JobManager (usually via context or dependency injection) and call ScheduleJob(ctx, job). The jobmanager.Job struct passed to this method includes the Type, Parameters (as map[string]interface{}), and desired Schedule (often time.Now() for immediate queuing).
  5. Execution: The Job Manager picks up queued jobs and executes their registered handler functions, passing the stored parameters.

Implementation Details:

  • Registering Job Types (JobRegisteringProvider):
    // Interface definition (internal/providers/interfaces.go)
    type JobRegisteringProvider interface {
        RegisterJobs(registry jobmanager.JobRegistry) error
    }
    
    // Example Provider Implementation (adapter.go)
    var _ providers.JobRegisteringProvider = (*MyProvider)(nil)
    
    const JobTypeProcessData = "my_provider.process_data"
    
    func (p *MyProvider) RegisterJobs(registry jobmanager.JobRegistry) error {
        err := registry.RegisterJobType(JobTypeProcessData, p.processDataJob) // Link type string to handler func
        if err != nil {
            return fmt.Errorf("failed to register job %s: %w", JobTypeProcessData, err)
        }
        return nil
    }
    
  • Implementing Job Handler:
    // Example Handler (e.g., jobs.go)
    func (p *MyProvider) processDataJob(ctx context.Context, params map[string]interface{}) error {
        // 1. Extract and validate parameters
        dataID, ok := params["data_id"].(string)
        if !ok { return fmt.Errorf("missing or invalid 'data_id' parameter") }
    
        // 2. Perform the long-running task
        log.Printf("Starting job %s for data %s", JobTypeProcessData, dataID)
        // ... interact with external services, process data ...
        time.Sleep(30 * time.Second) // Simulate work
        log.Printf("Finished job %s for data %s", JobTypeProcessData, dataID)
    
        // 3. Return nil on success, or an error on failure
        return nil
    }
    
  • Scheduling a Job:
    // Example within a Webhook Handler or API Handler
    func (p *MyProvider) handleIncomingRequest(w http.ResponseWriter, r *http.Request) {
        // ... parse request, get dataID ...
        dataID := "some-data-identifier"
    
        // Get JobManager (example: from context)
        jobManager, ok := r.Context().Value(jobmanager.JobManagerKey).(jobmanager.JobManager)
        if !ok { /* Handle error - JobManager not found */ return }
    
        // Prepare job parameters
        jobParams := map[string]interface{}{"data_id": dataID}
    
        // Create and schedule the job
        job := jobmanager.Job{
            Type:       JobTypeProcessData,
            Parameters: jobParams,
            Schedule:   time.Now(), // Queue for immediate execution
            // Name, UserID etc. might be set automatically or manually
        }
        jobID, err := jobManager.ScheduleJob(r.Context(), job)
        if err != nil { /* Handle scheduling error, return 500 */ return }
    
        // Respond to the original request
        w.WriteHeader(http.StatusAccepted)
        fmt.Fprintf(w, `{"status":"processing_scheduled", "job_id":"%s"}`, jobID)
    }
    

API Access (Private - Requires Auth):

  • POST /api/jobs: Manually create and queue a new background job.
  • GET /api/jobs: List background jobs (filterable by status, user).
  • GET /api/jobs/:jobID: Get details of a specific background job.
  • POST /api/jobs/:jobID/cancel: Request cancellation of a running job.
  • GET /api/jobs/:jobID/history: Get the status history of a specific job.

Using the Job Manager is crucial for building robust and scalable providers that handle potentially long-running tasks without impacting server performance.

Storage

Providers often need to persist data related to their operation, such as configuration, state, user credentials, or processed results. To maintain autonomy and testability, providers should manage their own storage through dedicated interfaces.

Purpose:

  • To store provider-specific data persistently.
  • To decouple the provider's logic from the specific storage backend (e.g., PostgreSQL, Redis, in-memory).
  • To facilitate easier testing by allowing mock storage implementations.

Mechanism:

  1. Define Storage Interface: Within the provider's package, define a Go interface (e.g., Store) that specifies the data access methods required by the provider (e.g., SaveItem, GetItem, DeleteItem).
  2. Implement Interface: Create one or more concrete implementations of the storage interface. A common implementation uses PostgreSQL, often utilizing the database/sql package or libraries like pgx.
  3. Dependency Injection: The provider's adapter struct should hold an instance of the storage interface. This instance (e.g., *PgStore) is typically created during server initialization (using the shared database connection pool) and injected into the provider's constructor.
  4. Usage: The provider's methods interact with the database solely through the defined storage interface methods.

Implementation Example:

  • Define Interface (store.go):
    package myprovider
    
    import "context"
    
    // Store defines the storage interface for MyProvider
    type Store interface {
        SaveItem(ctx context.Context, item Item) error
        GetItem(ctx context.Context, id string) (*Item, error)
        // ... other methods
    }
    
    // Item represents a data item
    type Item struct {
        ID   string
        Data map[string]interface{}
        // ... other fields
    }
    
  • Implement with PostgreSQL (pg_store.go):
    package myprovider
    
    import (
        "context"
        "database/sql" // Or pgxpool.Pool
        "encoding/json"
        "fmt"
    )
    
    var _ Store = (*PgStore)(nil) // Compile-time check
    
    type PgStore struct {
        db *sql.DB // Or *pgxpool.Pool
    }
    
    func NewPgStore(db *sql.DB) *PgStore {
        return &PgStore{db: db}
    }
    
    func (s *PgStore) SaveItem(ctx context.Context, item Item) error {
        dataJSON, err := json.Marshal(item.Data)
        if err != nil { /* handle error */ }
    
        _, err = s.db.ExecContext(ctx,
            `INSERT INTO my_provider_items (id, data) VALUES ($1, $2) 
             ON CONFLICT (id) DO UPDATE SET data = $2`,
            item.ID, dataJSON)
        // ... handle error ...
        return err
    }
    
    func (s *PgStore) GetItem(ctx context.Context, id string) (*Item, error) {
        // ... query logic using s.db.QueryRowContext ...
    }
    // ... other methods
    
  • Inject into Adapter (adapter.go):
    package myprovider
    
    import (
        "context"
        "database/sql"
    )
    
    type MyProvider struct {
        store Store // Use the interface type
        // ... other fields
    }
    
    // Constructor receives the concrete store implementation
    func NewProvider(db *sql.DB) *MyProvider {
        store := NewPgStore(db) // Create the concrete store
        return &MyProvider{
            store: store, // Inject it
        }
    }
    
    func (p *MyProvider) SomeOperation(ctx context.Context, itemID string) error {
        // Use the store via the interface
        item, err := p.store.GetItem(ctx, itemID)
        // ...
    }
    

Database Schema:

Providers requiring database storage are responsible for defining their necessary SQL table schemas. These migrations should be managed as part of the overall SLOP Server database migration strategy.

By abstracting storage operations behind interfaces, providers remain modular and adaptable to different deployment environments or changes in storage technology.

Configuration

Providers often require configuration settings to function correctly, such as API keys, service endpoints, timeouts, or feature flags. The SLOP Server provides mechanisms for providers to load and manage these settings.

Purpose:

  • To allow customization of provider behavior without modifying code.
  • To securely manage sensitive information like API keys or credentials.
  • To adapt provider functionality to different environments (development, staging, production).

Common Approaches:

  1. Environment Variables: The most common and recommended method, especially for sensitive data or settings that vary across environments. Environment variables are easily managed in containerized deployments and CI/CD pipelines.
  2. Configuration Files: Less common for individual providers in the SLOP context, as the main server configuration might handle broader settings. If used, providers might load specific sections from a central configuration file managed by the server.
  3. Database: For dynamic configuration that needs to be updated without restarting the server (though this is less typical for initial setup parameters like API keys).

Implementation Strategy:

  1. Define Config Struct: Create a Go struct within the provider package to hold all its configuration parameters.
  2. Loading Function: Implement a function (e.g., LoadConfig()) that reads values from the environment (or other sources) and populates the config struct. This function should handle defaults and validation.
  3. Initialization: Call the LoadConfig() function during the provider's initialization phase (e.g., within the NewProvider constructor or the Initialize method). The loaded config struct can then be stored within the provider's adapter struct.

Implementation Example:

  • Define Config Struct (config.go):
    package myprovider
    
    import (
        "fmt"
        "os"
        "strconv"
        "time"
    )
    
    // Config holds the provider configuration
    type Config struct {
        APIKey      string
        APIEndpoint string
        Timeout     time.Duration
        MaxRetries  int
    }
    
    // LoadConfig loads configuration from environment variables
    func LoadConfig() (*Config, error) {
        apiKey := os.Getenv("MY_PROVIDER_API_KEY")
        if apiKey == "" {
            return nil, fmt.Errorf("MY_PROVIDER_API_KEY environment variable is required")
        }
    
        endpoint := os.Getenv("MY_PROVIDER_API_ENDPOINT")
        if endpoint == "" {
            endpoint = "https://api.example.com/v1" // Default value
        }
    
        timeoutStr := os.Getenv("MY_PROVIDER_TIMEOUT_SECONDS")
        timeout := 30 * time.Second // Default value
        if timeoutStr != "" {
            timeoutSec, err := strconv.Atoi(timeoutStr)
            if err != nil {
                return nil, fmt.Errorf("invalid MY_PROVIDER_TIMEOUT_SECONDS: %w", err)
            }
            timeout = time.Duration(timeoutSec) * time.Second
        }
    
        retriesStr := os.Getenv("MY_PROVIDER_MAX_RETRIES")
        maxRetries := 3 // Default value
        if retriesStr != "" {
            var err error
            maxRetries, err = strconv.Atoi(retriesStr)
            if err != nil || maxRetries < 0 {
                return nil, fmt.Errorf("invalid MY_PROVIDER_MAX_RETRIES: %s", retriesStr)
            }
        }
    
        return &Config{
            APIKey:      apiKey,
            APIEndpoint: endpoint,
            Timeout:     timeout,
            MaxRetries:  maxRetries,
        }, nil
    }
    
  • Use in Provider (adapter.go):
    package myprovider
    
    import (
        "context"
        "log"
    )
    
    type MyProvider struct {
        config *Config
        // ... other fields like store, notifier, etc.
    }
    
    func NewProvider(/* dependencies */) (*MyProvider, error) {
        cfg, err := LoadConfig()
        if err != nil {
            return nil, fmt.Errorf("failed to load my_provider config: %w", err)
        }
        log.Println("MyProvider configuration loaded successfully.")
    
        return &MyProvider{
            config: cfg,
            // ... initialize other fields
        }, nil
    }
    
    func (p *MyProvider) Initialize(ctx context.Context) error {
        // Use config values if needed during initialization
        log.Printf("Initializing MyProvider with endpoint: %s", p.config.APIEndpoint)
        // ...
        return nil
    }
    
    // Other methods can access config via p.config
    

Best Practices:

  • Environment Variables: Prefer environment variables for configuration, especially for secrets.
  • Defaults: Provide sensible default values for non-critical settings.
  • Clear Naming: Use clear and specific names for environment variables (e.g., PROVIDERNAME_SETTING_NAME).
  • Validation: Validate configuration values during loading to fail fast if settings are invalid.
  • Documentation: Clearly document all required and optional configuration settings for the provider.

Provider Chaining

Provider chaining is a powerful pattern within the SLOP Server that allows providers to collaborate by passing information sequentially. An upstream provider might generate an event (often triggered by an external webhook or internal process), which is then consumed by a downstream provider. The downstream provider processes the data and may, in turn, emit its own events for further consumption, creating a processing pipeline.

Purpose:

  • To build complex workflows by composing modular, single-purpose providers.
  • To decouple different stages of a process (e.g., data ingestion, transformation, notification).
  • To promote reusability of provider functionalities.

Mechanism:

Provider chaining typically relies on the interplay of Webhooks, Events, and Jobs:

  1. Initiation: An external event triggers a webhook handler in an upstream provider (Provider A), or an internal process in Provider A completes.
  2. Event Publication: Provider A publishes an internal event via the Pub/Sub system, containing relevant data or references (e.g., a resource ID).
  3. Event Consumption/Trigger:
    • A dedicated listener or an Event Trigger configured in the system detects the event published by Provider A.
    • This trigger schedules a background job for a downstream provider (Provider B).
  4. Job Execution: Provider B's job handler executes.
  5. Data Fetching (if needed): Provider B might need to fetch additional data, potentially by executing a tool exposed by Provider A (using the Provider Registry) or by accessing a shared resource (using the Resource Manager).
  6. Processing: Provider B performs its specific processing or transformation task.
  7. Further Publication (Optional): Provider B might publish its own event upon completion, allowing further chaining with Provider C, and so on.

Example: Unipile -> Email Draft Preparator

This chain processes incoming emails:

  1. Unipile (Provider A):
    • Receives a webhook from an external email service (e.g., Gmail) indicating a new email.
    • The webhook handler stores the raw payload as a resource.
    • Publishes an internal event unipile.email.received containing references like the account ID, email ID, and the resource ID of the raw payload.
  2. Event Trigger:
    • An event trigger is configured to listen for unipile.email.received.
    • When the event occurs, the trigger schedules a job of type email_draft_preparator.process_email.
  3. Email Draft Preparator (Provider B):
    • The process_email job handler executes.
    • It extracts the email ID and account ID from the job parameters (originating from the event payload).
    • It executes the unipile.get_email tool (belonging to Provider A) to fetch the full email details.
    • It processes the email content, potentially converting it to XML or another format.
    • It stores the processed result (e.g., as a new resource).
    • It publishes a new event, such as email.draft.prepared, containing the ID of the processed resource.

Provider Chaining Flow Diagram:

graph LR
    A[External Event Source] --> B(Provider A: Webhook/Process);
    B -- Publishes Event --> C{Internal Pub/Sub};
    C -- Event Notification --> D(Event Listener / Trigger);
    D -- Schedules Job --> E{Job Manager};
    E -- Executes Job --> F(Provider B: Job Handler);
    F -- Optional: Executes Tool --> G(Provider A: Tool Execution);
    F -- Processes Data --> F;
    F -- Optional: Publishes New Event --> C;

    subgraph "Example: Unipile -> Email Preparator"
        direction LR
        ExtEmail[External Email Service] --> UnipileWebhook{Unipile: Webhook Handler};
        UnipileWebhook -- Publishes "unipile.email.received" --> PubSubBus[Pub/Sub];
        PubSubBus -- Notifies --> Trigger(Event Trigger);
        Trigger -- Schedules Job --> JobMgr{Job Manager};
        JobMgr -- Executes Job --> EmailPrepJob(Email Preparator: process_email Job);
        EmailPrepJob -- Executes "unipile.get_email" --> UnipileTool{Unipile Provider Tool};
        EmailPrepJob -- Processes Email --> EmailPrepJob;
        EmailPrepJob -- Publishes "email.draft.prepared" --> PubSubBus;
    end

This pattern, leveraging the core Pub/Sub and Job Manager systems while respecting Provider Autonomy, enables sophisticated, maintainable workflows within the SLOP Server.

graph LR
    A[External Event Source] --> B(Provider A: Webhook Listener);
    B -- Publishes Event --> C{Internal Pub/Sub};
    C -- Event Notification --> D(Provider B: Event Listener / Job Trigger);
    D -- Processes Data --> D;
    D -- Publishes New Event --> C;
    C -- Event Notification --> E(Provider C: Event Listener / Job Trigger);
    subgraph "Example: Unipile -> Email Preparator"
        direction LR
        ExtEmail[External Email Service] --> UnipileWebhook{Unipile Provider Webhook};
        UnipileWebhook -- "unipile.email.received" --> PubSubBus[Pub/Sub];
        PubSubBus -- Trigger --> EmailPrepJob(Email Preparator Job);
        EmailPrepJob -- Fetches Email via Unipile Tool --> UnipileTool{Unipile Provider Tool};
        EmailPrepJob -- Processes Email --> EmailPrepJob;
        EmailPrepJob -- "email.draft.prepared" --> PubSubBus;
    end

Provider Descriptions (New Feature)

To aid developers and potentially LLMs in understanding and utilizing providers effectively, each provider's reference section should include a standardized description block. This block provides essential context beyond the basic tool and event listings.

Purpose:

  • To offer a quick overview of the provider's function and scope.
  • To clearly state any prerequisites needed before the provider can be used.
  • To explain the authentication mechanisms required, if any.
  • To illustrate typical usage patterns through common scenarios.
  • To highlight known limitations or important considerations.

Standard Structure:

Each provider's reference section (under Provider Reference) should contain a subsection formatted as follows:

*   **Overview:** A brief, one or two-sentence summary of the provider's main purpose.
*   **Description:**
    *   **Prerequisites:** List any setup steps, external accounts, configurations (e.g., specific environment variables beyond basic API keys), or other providers that must be in place before this provider can function correctly.
    *   **Authentication:** Detail how the provider authenticates with external services (e.g., API Keys, OAuth2 flow initiated via a specific tool, credentials stored in config). Explain if user interaction is required for setup.
    *   **Common Use Cases / Scenarios:** Describe 2-3 typical ways the provider is used. For each scenario:
        *   Provide a brief narrative description.
        *   Include a sequence of API calls (using the SLOP Server API, e.g., `POST /api/tools/{tool_id}`) demonstrating the scenario. Show example JSON request bodies for the tool calls.
        *   Focus on the interaction with the SLOP server's tools, *not* the direct API calls to the underlying external service.
    *   **Limitations:** Mention any known constraints, edge cases, or aspects the provider does not cover (e.g., specific data types not supported, rate limits inherited from external services, features not yet implemented).

Example Snippet (Conceptual for a hypothetical 'Calendar' provider):

### Calendar (`calendar`)
*   **Overview:** Provides tools to interact with users' external calendar services (e.g., Google Calendar, Outlook Calendar).
*   **Description:**
    *   **Prerequisites:** Requires the `unipile` provider to be configured and the user to have connected their calendar account via Unipile.
    *   **Authentication:** Relies on the authentication tokens managed by the `unipile` provider. No separate authentication setup is needed specifically for the `calendar` provider itself, but the underlying Unipile connection must be active.
    *   **Common Use Cases / Scenarios:**
        1.  **List Upcoming Events:** Retrieve a list of the user's upcoming calendar events.
            *   Narrative: A user wants to see their schedule for the next day.
            *   API Call Sequence:
                ```
                POST /api/tools/calendar.list_events
                {
                  "account_id": "acc_12345", // Obtained via unipile.list_accounts
                  "start_time": "2025-05-04T00:00:00Z",
                  "end_time": "2025-05-05T00:00:00Z"
                }
                ```
        2.  **Create a New Event:** Add a new event to the user's calendar.
            *   Narrative: Schedule a meeting based on a user request.
            *   API Call Sequence:
                ```
                POST /api/tools/calendar.create_event
                {
                  "account_id": "acc_12345",
                  "summary": "Project Sync Meeting",
                  "start_time": "2025-05-06T14:00:00Z",
                  "end_time": "2025-05-06T15:00:00Z",
                  "attendees": ["user@example.com", "colleague@example.com"]
                }
                ```
    *   **Limitations:** Does not currently support managing recurring events or calendar sharing settings. Relies entirely on the capabilities exposed by the underlying Unipile connection.
*   **Tools:**
    *   `calendar.list_events`: ...
    *   `calendar.create_event`: ...
*   **Webhooks:** None.
*   **Events:**
    *   `calendar.event.created`: Published after a new event is successfully created via the tool.
*   **Jobs:** None.
*   **Configuration:** None (relies on Unipile configuration).
*   **Notes:** Ensure the Unipile account sync is up-to-date for accurate calendar information.
This standardized description block aims to significantly improve the usability and discoverability of provider functionalities. ```