MFO Provider Unified Documentation¶
Version: 1.0 Date: May 03, 2025
1. Introduction¶
The intent of this documentation is to provide guideline to help the developper extending the slop server by adding its own pkg/provider/
Welcome to the unified documentation for developing and understanding providers within the SLOP (Simple Language Open Protocol) Server. This document aims to be a comprehensive resource for developers looking to extend the capabilities of the SLOP server by creating new providers or integrating existing ones.
Purpose¶
The primary purpose of this documentation is to provide a clear, detailed, and technically accurate guide for developers. It covers the core architectural concepts underpinning the provider system, offers a step-by-step guide to developing new providers, and serves as a reference for the functionalities offered by the currently available providers. Additionally, it introduces a standardized way to describe provider capabilities, including prerequisites, authentication methods, and common usage scenarios, to facilitate easier integration and understanding.
Target Audience¶
This document is primarily intended for software developers involved in:
- Building new providers to integrate external services or custom logic into the SLOP server.
- Maintaining or extending existing SLOP providers.
- Understanding the capabilities and interaction patterns of SLOP providers for integration purposes.
While the focus is technical, the inclusion of use-case scenarios and clear descriptions may also be beneficial for system architects and integrators working with the SLOP ecosystem.
Scope¶
This documentation covers:
- Core Concepts: The fundamental principles of the SLOP provider architecture, including autonomy, registration, tools, webhooks, events, jobs, storage, configuration, and chaining.
- Development Guide: A practical guide to creating a new provider, from initial setup to testing.
- Provider Reference: Detailed descriptions of the existing providers (
template_example
,filesystem
,llm
,unipile
,email_draft_preparator
), including their specific tools, events, configurations, and usage scenarios with API examples. - API Summary: A concise overview of the relevant SLOP Server API endpoints for interacting with providers, tools, events, and jobs.
It consolidates information from previous guides and internal documentation, incorporating clarifications and adding the new standardized provider description format.
Provider Autonomy¶
Maintaining autonomy between providers is a core principle in the SLOP Server architecture. This ensures that providers are decoupled, modular, and easier to maintain and update independently.
Guideline: Provider packages (e.g., pkg/providers/unipile
) MUST remain autonomous and decoupled from one another.
Core Principles:
- No Direct Cross-Provider Imports: A provider package (e.g.,
pkg/providers/A
) MUST NOT directly import another specific provider package (e.g.,pkg/providers/B
). The only exception is importing sharedmodels
packages if absolutely necessary. - No Direct Cross-Provider Calls: A provider package MUST NOT directly call functions or methods defined in another specific provider package.
- Communication via Abstraction: Interaction between providers MUST occur through abstracted mechanisms provided by the SLOP Server core:
- Pub/Sub System: Providers publish generic events (e.g.,
providerA.entity.created
) using thepubsub.Publisher
interface. Other components (potentially dedicated listeners outside the provider packages, or event triggers) subscribe to these events to initiate actions. - Job Manager: Complex workflows or actions involving multiple providers should be orchestrated by creating jobs via the central
jobmanager.JobManager
. Jobs can be triggered by event listeners or external API calls, not directly by another provider. - Resource Manager: Providers can share data indirectly by creating or reading resources via the
resources.Manager
. Event payloads can contain resource IDs instead of large data blobs. - Tool Execution: A provider can execute a tool registered by another provider via the
providers.ProviderRegistry
orproviders.Provider
interface, but this should be used judiciously, typically within a job or specific workflow handler, not as a general-purpose direct call mechanism.
- Pub/Sub System: Providers publish generic events (e.g.,
- Shared Dependencies: Providers MAY import shared, generic packages like internal core types, interfaces (
internal/providers
,internal/core
,internal/jobmanager
,internal/resources
,internal/pubsub
), or common model definitions, but NOT specific implementations of other providers.
Rationale:
- Reduced Complexity: Prevents tangled dependencies and makes the system easier to understand.
- Improved Maintainability: Changes within one provider are less likely to break others.
- Enhanced Testability: Providers can be tested in isolation more effectively.
- Flexibility: Allows providers to be added, removed, or replaced with minimal impact on the rest of the system.
Example Violation (Incorrect):
// pkg/providers/providerA/adapter.go
package providerA
import (
// ... other imports
providerB "gitlab.com/webigniter/slop-server/pkg/providers/providerB" // <-- VIOLATION: Direct import
)
func (a *AdapterA) HandleSomeEvent(...) error {
// ... process event ...
// Directly call a function in Provider B
err := providerB.DoSomethingSpecific(...) // <-- VIOLATION: Direct call
// ...
}
Example Correct Implementation (Using Pub/Sub and Jobs):
// pkg/providers/providerA/adapter.go
package providerA
import (
"gitlab.com/webigniter/slop-server/internal/pubsub"
// NO import for providerB
)
const EventProviderACompleted = "providerA.task.completed"
func (a *AdapterA) HandleSomeEvent(ctx context.Context, ...) error {
// ... process event ...
resultData := map[string]interface{}{ "key": "value" }
// Publish a generic event
if err := a.notifier.Publish(ctx, EventProviderACompleted, resultData); err != nil {
log.Printf("ERROR publishing event %s: %v", EventProviderACompleted, err)
}
// DO NOT call Provider B directly.
return nil
}
// --- Elsewhere (e.g., internal/listeners/listeners.go or via Event Trigger Config) ---
import (
"gitlab.com/webigniter/slop-server/internal/jobmanager"
// Potentially import providerB's job registration details if needed
)
func handleProviderACompletion(ctx context.Context, eventPayload map[string]interface{}) {
// Extract necessary info from eventPayload
// Get Job Manager instance (dependency injection)
jobMgr := getJobManagerFromContext(ctx)
// Schedule a job defined by Provider B
jobParams := map[string]interface{}{ /* derived from eventPayload */ }
job := jobmanager.Job{ Type: "providerB.process_data", Parameters: jobParams }
_, err := jobMgr.ScheduleJob(ctx, job)
if err != nil { /* ... handle error ... */ }
}
By adhering to these principles, the SLOP Server maintains a clean, scalable, and robust architecture.
Provider Registration¶
For a provider to be recognized and utilized by the SLOP Server, it must be registered during the server's initialization phase. This process makes the provider's capabilities (tools, webhooks, jobs, events) available to the rest of the system.
Mechanism:
- Implementation: The provider must implement the
providers.Provider
interface. - Instantiation: A factory function (e.g.,
NewProvider()
) is typically defined within the provider's package to create an instance of the provider's main struct (adapter). - Registration Call: The core of the registration happens in the
pkg/providers/providers.go
file. Aninit()
function within this package callsproviders.RegisterProvider()
for each provider that should be included in the server build.
Steps for Developers:
- Create Provider Instance: Ensure your provider package has a constructor function (e.g.,
func NewMyProvider() *MyProvider
). - Modify
pkg/providers/providers.go
:- Add an import statement for your new provider package.
- Inside the
init()
function, callproviders.RegisterProvider()
passing an instance created by your constructor.
Example (pkg/providers/providers.go
):
package providers
import (
// Core provider interface
"gitlab.com/webigniter/slop-server/internal/providers"
// Import existing providers
"gitlab.com/webigniter/slop-server/pkg/providers/filesystem"
"gitlab.com/webigniter/slop-server/pkg/providers/llm/openai"
"gitlab.com/webigniter/slop-server/pkg/providers/template_example"
"gitlab.com/webigniter/slop-server/pkg/providers/unipile"
"gitlab.com/webigniter/slop-server/pkg/providers/email_draft_preparator"
// Import YOUR new provider package
"gitlab.com/webigniter/slop-server/pkg/providers/my_provider" // <--- Add this import
)
func init() {
// Register core/existing providers
providers.RegisterProvider(filesystem.NewProvider())
providers.RegisterProvider(openai.NewProvider())
providers.RegisterProvider(template_example.NewProvider())
providers.RegisterProvider(unipile.NewProvider())
providers.RegisterProvider(emaildraftpreparator.NewProvider())
// Register your new provider
providers.RegisterProvider(myprovider.NewProvider()) // <--- Add this line
}
Initialization Sequence:
When the SLOP server starts:
- The
init()
function inpkg/providers/providers.go
runs, callingRegisterProvider
for all included providers. - The central
providers.Registry
stores instances of each registered provider. - During server setup, the core system iterates through the registered providers:
- Calls the
Initialize()
method on each provider. - Calls
RegisterTools()
,RegisterWebhooks()
,RegisterJobs()
,RegisterEventDefinitions()
(if implemented) on each provider, passing the respective registries (ToolRegistry
,WebhookRegistry
,JobRegistry
,EventRegistry
).
- Calls the
This ensures that all provider capabilities are properly configured and made available before the server starts accepting requests.
Webhooks (Incoming)¶
Webhooks provide a mechanism for external services to send asynchronous notifications or data to the SLOP Server. Providers can register specific endpoints to listen for these incoming requests.
Purpose:
- To allow external systems (e.g., SaaS platforms, Git repositories, payment gateways) to push information to SLOP in real-time or near real-time.
- To trigger internal workflows or data processing based on external events.
Mechanism:
- External Service Configuration: The external service is configured to send HTTP POST requests to a specific URL provided by the SLOP server.
- Provider Implementation: The provider implements the
providers.WebhookRegisteringProvider
interface. - Registration: During initialization, the provider registers a specific path (e.g.,
/webhooks/{provider}/{webhook_id}
) and a corresponding handler function using theproviders.WebhookRegistry
. - Handling: When an HTTP request hits a registered webhook path, the SLOP server routes it to the provider's designated handler function.
- Processing: The handler function is responsible for:
- Reading and parsing the request body (often JSON, but can vary).
- Validating the payload (and potentially verifying signatures if implemented).
- Performing minimal immediate processing.
- Crucially: For any non-trivial processing, the handler should schedule a background job using the
JobManager
to avoid blocking the webhook response and ensure reliability. The job can be passed necessary information (like the payload or a reference to it). - Returning an appropriate HTTP status code quickly (e.g.,
200 OK
for immediate success,202 Accepted
if a job was scheduled).
Implementation (WebhookRegisteringProvider
Interface):
// Interface definition (internal/providers/interfaces.go)
type WebhookRegisteringProvider interface {
RegisterWebhooks(registry WebhookRegistry) error
}
// Example Provider Implementation (adapter.go)
var _ providers.WebhookRegisteringProvider = (*MyProvider)(nil)
func (p *MyProvider) RegisterWebhooks(registry providers.WebhookRegistry) error {
// Construct a unique path for this webhook
webhookPath := fmt.Sprintf("/webhooks/%s/my_event", ProviderName)
err := registry.RegisterWebhook(webhookPath, p.handleMyEventWebhook)
if err != nil {
return fmt.Errorf("failed to register webhook %s: %w", webhookPath, err)
}
return nil
}
// Example Handler Implementation (e.g., webhook_handler.go)
func (p *MyProvider) handleMyEventWebhook(w http.ResponseWriter, r *http.Request) {
// 1. Read Body
body, err := io.ReadAll(r.Body)
if err != nil { /* Handle error, return 400 */ return }
// 2. Parse Payload
var payload MyEventPayload
if err := json.Unmarshal(body, &payload); err != nil { /* Handle error, return 400 */ return }
// 3. Validate (Optional)
if !isValid(payload) { /* Handle error, return 400/403 */ return }
// 4. Schedule Job for actual processing
jobParams := map[string]interface{}{"payload": payload} // Or store payload as resource and pass ID
job := jobmanager.Job{ Type: "myprovider.process_event", Parameters: jobParams }
jobID, err := p.jobManager.ScheduleJob(r.Context(), job) // Assuming jobManager is available
if err != nil { /* Handle error, return 500 */ return }
// 5. Respond Quickly
w.WriteHeader(http.StatusAccepted)
fmt.Fprintf(w, `{"status":"scheduled", "job_id":"%s"}`, jobID)
}
Webhook Flow Diagram:
sequenceDiagram
participant ExternalService
participant SLOP_Server
participant ProviderWebhookHandler
participant JobManager
participant BackgroundJob
ExternalService->>+SLOP_Server: POST /webhooks/{provider}/{webhook_id} (Payload)
SLOP_Server->>+ProviderWebhookHandler: Invoke handler(request)
ProviderWebhookHandler->>ProviderWebhookHandler: Validate/Parse Payload
alt Minimal Processing (Not Recommended for complex tasks)
ProviderWebhookHandler-->>-SLOP_Server: 2xx OK Response
else Background Processing Needed (Recommended)
ProviderWebhookHandler->>+JobManager: ScheduleJob(JobType, Payload/PayloadRef)
JobManager-->>-ProviderWebhookHandler: JobID
ProviderWebhookHandler-->>-SLOP_Server: 202 Accepted (JobID)
JobManager->>+BackgroundJob: Execute Job(Payload/PayloadRef)
BackgroundJob->>BackgroundJob: Perform processing
BackgroundJob-->>-JobManager: Job Complete/Failed
end
SLOP_Server-->>-ExternalService: Response (2xx)
API Access:
POST /webhooks/:provider/:webhook_id
: The public endpoint where external services send webhook data. The exact path is determined during provider registration.
Webhooks are essential for integrating SLOP with external systems that initiate communication.
Events (Internal Pub/Sub)¶
Beyond receiving external webhooks, providers can publish internal events using the SLOP Server's Pub/Sub (Publish/Subscribe) system. This mechanism facilitates decoupled communication between different parts of the server, including between providers or between providers and core services.
Purpose:
- To notify other components about significant occurrences within a provider (e.g.,
unipile.email.received
,myprovider.document.processed
). - To trigger subsequent actions or workflows in a decoupled manner (See Provider Chaining).
- To enable internal automation via Event Triggers.
- To allow external clients to subscribe to server events via Webhook Subscriptions (though the client interacts with the subscription API, not the internal Pub/Sub directly).
Mechanism:
The internal Pub/Sub system relies on the pubsub.Publisher
interface (defined in internal/pubsub/interface.go
). Providers that need to publish events should receive an implementation of this interface via dependency injection during their initialization.
Workflow for Publishing Events:
- Dependency: Ensure your provider's adapter struct includes a field for
pubsub.Publisher
and receives it in its constructor. - Define Event Channel: Choose a unique, descriptive channel name, typically following the pattern
provider_name.resource_type.action
(e.g.,unipile.account.sync_status.updated
,email_draft_preparator.draft.prepared
). - Define Payload Struct: Create a Go struct representing the data for the event. This struct should be serializable to JSON and contain all necessary information for consumers.
- Publish: Call the
notifier.Publish(ctx, channelName, payloadStruct)
method where the event occurs.func (a *MyProviderAdapter) completeProcessing(ctx context.Context, docID string, url string) error { // ... processing logic ... eventPayload := MyProviderDocumentProcessedEvent{ DocumentID: docID, Status: "completed", ProcessedFileURL: url, Timestamp: time.Now(), } channel := "myprovider.document.processed" log.Printf("Publishing event '%s' for document %s", channel, docID) err := a.notifier.Publish(ctx, channel, eventPayload) if err != nil { log.Printf("ERROR publishing event '%s': %v", channel, err) // Decide how to handle publish errors (retry, log, ignore?) } return err // Or nil depending on criticality }
Payload Handling:
- Serialization: The payload struct passed to
Publish
must be marshallable to JSON usingencoding/json
. - Large Payloads: The system (specifically the
internal/pubsub/postgres
implementation) automatically handles large payloads. If the marshalled JSON exceeds a certain size (~7.5KB), it's transparently stored using theresources.Manager
, and only a resource reference is published internally. Providers do not need to manually check payload size before publishing.
Event Discovery:
To make events discoverable and usable by API clients (for webhook subscriptions or triggers), providers should implement the providers.EventInfoProvider
interface.
// Interface definition (internal/providers/interfaces.go)
type EventInfoProvider interface {
// GetEventDefinitions returns basic info about published events.
GetEventDefinitions() []EventDefinition
// GetDetailedEventInfo returns detailed info (e.g., schema, examples) about events.
GetDetailedEventInfo() []DetailedEventInfo
}
// EventDefinition describes an event published by a provider.
type EventDefinition struct {
Channel string `json:"channel"` // Unique event channel name (e.g., provider.resource.action)
Description string `json:"description"` // User-friendly description of the event
SourceProvider string `json:"source_provider"` // ID of the provider publishing the event
PayloadSchema interface{} `json:"payload_schema,omitempty"` // Optional: JSON Schema describing the payload structure
}
// DetailedEventInfo provides extended information about an event.
type DetailedEventInfo struct {
Channel string `json:"channel"`
Description string `json:"description"`
SourceProvider string `json:"source_provider"`
PayloadSchema interface{} `json:"payload_schema,omitempty"`
// ExamplePayload interface{} `json:"example_payload,omitempty"` // Optional: Example payload
// Notes string `json:"notes,omitempty"` // Optional: Additional notes
}
// Example Provider Implementation (adapter.go)
var _ providers.EventInfoProvider = (*MyProvider)(nil)
func (p *MyProvider) GetEventDefinitions() []providers.EventDefinition {
return []providers.EventDefinition{
{
Channel: "myprovider.document.processed",
Description: "Triggered when a document has finished processing.",
SourceProvider: ProviderName,
// Optionally provide a JSON schema for the payload struct
},
// ... other events published by this provider
}
}
// Example implementation for detailed info (can return nil if not needed)
func (p *MyProvider) GetDetailedEventInfo() []providers.DetailedEventInfo {
// Return nil or populate detailed info based on EventDefinitions
return nil
}
Registered event definitions can be queried via the authenticated endpoint: * GET /api/events/definitions
// TODO: Confirm endpoint or if it uses detailed info
Internal events are a powerful tool for building reactive and decoupled workflows within the SLOP Server.
Jobs (Background Processing)¶
For operations that might take a significant amount of time (e.g., processing large files, interacting with slow external APIs, complex computations), providers should utilize the SLOP Server's Job Manager to execute these tasks asynchronously in the background.
Purpose:
- Prevent Blocking: Avoid tying up API request handlers or webhook listeners while long operations complete.
- Improve Responsiveness: Allow the server to respond quickly to initial requests (e.g., with a
202 Accepted
status and a Job ID). - Enhance Reliability: Jobs can be queued, managed, and potentially retried if they fail.
- Resource Management: Allows the server to manage the execution of resource-intensive tasks more effectively.
Mechanism:
The system relies on a central jobmanager.JobManager
and associated interfaces:
jobmanager.JobManager
: Interface for scheduling and managing jobs.jobmanager.JobRegistry
: Interface used during initialization to register job types and their corresponding handler functions.providers.JobRegisteringProvider
: Interface that providers implement to register their job types.jobmanager.Job
: Struct representing a job instance, containing its type, parameters, schedule, status, etc.
Workflow:
- Define Job Type: Choose a unique string identifier for the job type, typically
provider_name.job_action
(e.g.,my_provider.process_data
,email_draft_preparator.process_email
). - Implement Handler: Create a function within the provider that performs the actual work. This function must match the signature
func(ctx context.Context, params map[string]interface{}) error
. - Register Job Type: Implement the
providers.JobRegisteringProvider
interface and use theRegisterJobs
method to associate the job type string with its handler function via theJobRegistry
. - Schedule Job: When a long-running task needs to be performed (often triggered by an API call or a webhook handler), obtain an instance of the
JobManager
(usually via context or dependency injection) and callScheduleJob(ctx, job)
. Thejobmanager.Job
struct passed to this method includes theType
,Parameters
(asmap[string]interface{}
), and desiredSchedule
(oftentime.Now()
for immediate queuing). - Execution: The Job Manager picks up queued jobs and executes their registered handler functions, passing the stored parameters.
Implementation Details:
- Registering Job Types (
JobRegisteringProvider
):// Interface definition (internal/providers/interfaces.go) type JobRegisteringProvider interface { RegisterJobs(registry jobmanager.JobRegistry) error } // Example Provider Implementation (adapter.go) var _ providers.JobRegisteringProvider = (*MyProvider)(nil) const JobTypeProcessData = "my_provider.process_data" func (p *MyProvider) RegisterJobs(registry jobmanager.JobRegistry) error { err := registry.RegisterJobType(JobTypeProcessData, p.processDataJob) // Link type string to handler func if err != nil { return fmt.Errorf("failed to register job %s: %w", JobTypeProcessData, err) } return nil }
- Implementing Job Handler:
// Example Handler (e.g., jobs.go) func (p *MyProvider) processDataJob(ctx context.Context, params map[string]interface{}) error { // 1. Extract and validate parameters dataID, ok := params["data_id"].(string) if !ok { return fmt.Errorf("missing or invalid 'data_id' parameter") } // 2. Perform the long-running task log.Printf("Starting job %s for data %s", JobTypeProcessData, dataID) // ... interact with external services, process data ... time.Sleep(30 * time.Second) // Simulate work log.Printf("Finished job %s for data %s", JobTypeProcessData, dataID) // 3. Return nil on success, or an error on failure return nil }
- Scheduling a Job:
// Example within a Webhook Handler or API Handler func (p *MyProvider) handleIncomingRequest(w http.ResponseWriter, r *http.Request) { // ... parse request, get dataID ... dataID := "some-data-identifier" // Get JobManager (example: from context) jobManager, ok := r.Context().Value(jobmanager.JobManagerKey).(jobmanager.JobManager) if !ok { /* Handle error - JobManager not found */ return } // Prepare job parameters jobParams := map[string]interface{}{"data_id": dataID} // Create and schedule the job job := jobmanager.Job{ Type: JobTypeProcessData, Parameters: jobParams, Schedule: time.Now(), // Queue for immediate execution // Name, UserID etc. might be set automatically or manually } jobID, err := jobManager.ScheduleJob(r.Context(), job) if err != nil { /* Handle scheduling error, return 500 */ return } // Respond to the original request w.WriteHeader(http.StatusAccepted) fmt.Fprintf(w, `{"status":"processing_scheduled", "job_id":"%s"}`, jobID) }
API Access (Private - Requires Auth):
POST /api/jobs
: Manually create and queue a new background job.GET /api/jobs
: List background jobs (filterable by status, user).GET /api/jobs/:jobID
: Get details of a specific background job.POST /api/jobs/:jobID/cancel
: Request cancellation of a running job.GET /api/jobs/:jobID/history
: Get the status history of a specific job.
Using the Job Manager is crucial for building robust and scalable providers that handle potentially long-running tasks without impacting server performance.
Storage¶
Providers often need to persist data related to their operation, such as configuration, state, user credentials, or processed results. To maintain autonomy and testability, providers should manage their own storage through dedicated interfaces.
Purpose:
- To store provider-specific data persistently.
- To decouple the provider's logic from the specific storage backend (e.g., PostgreSQL, Redis, in-memory).
- To facilitate easier testing by allowing mock storage implementations.
Mechanism:
- Define Storage Interface: Within the provider's package, define a Go interface (e.g.,
Store
) that specifies the data access methods required by the provider (e.g.,SaveItem
,GetItem
,DeleteItem
). - Implement Interface: Create one or more concrete implementations of the storage interface. A common implementation uses PostgreSQL, often utilizing the
database/sql
package or libraries likepgx
. - Dependency Injection: The provider's adapter struct should hold an instance of the storage interface. This instance (e.g.,
*PgStore
) is typically created during server initialization (using the shared database connection pool) and injected into the provider's constructor. - Usage: The provider's methods interact with the database solely through the defined storage interface methods.
Implementation Example:
- Define Interface (
store.go
):package myprovider import "context" // Store defines the storage interface for MyProvider type Store interface { SaveItem(ctx context.Context, item Item) error GetItem(ctx context.Context, id string) (*Item, error) // ... other methods } // Item represents a data item type Item struct { ID string Data map[string]interface{} // ... other fields }
- Implement with PostgreSQL (
pg_store.go
):package myprovider import ( "context" "database/sql" // Or pgxpool.Pool "encoding/json" "fmt" ) var _ Store = (*PgStore)(nil) // Compile-time check type PgStore struct { db *sql.DB // Or *pgxpool.Pool } func NewPgStore(db *sql.DB) *PgStore { return &PgStore{db: db} } func (s *PgStore) SaveItem(ctx context.Context, item Item) error { dataJSON, err := json.Marshal(item.Data) if err != nil { /* handle error */ } _, err = s.db.ExecContext(ctx, `INSERT INTO my_provider_items (id, data) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET data = $2`, item.ID, dataJSON) // ... handle error ... return err } func (s *PgStore) GetItem(ctx context.Context, id string) (*Item, error) { // ... query logic using s.db.QueryRowContext ... } // ... other methods
- Inject into Adapter (
adapter.go
):package myprovider import ( "context" "database/sql" ) type MyProvider struct { store Store // Use the interface type // ... other fields } // Constructor receives the concrete store implementation func NewProvider(db *sql.DB) *MyProvider { store := NewPgStore(db) // Create the concrete store return &MyProvider{ store: store, // Inject it } } func (p *MyProvider) SomeOperation(ctx context.Context, itemID string) error { // Use the store via the interface item, err := p.store.GetItem(ctx, itemID) // ... }
Database Schema:
Providers requiring database storage are responsible for defining their necessary SQL table schemas. These migrations should be managed as part of the overall SLOP Server database migration strategy.
By abstracting storage operations behind interfaces, providers remain modular and adaptable to different deployment environments or changes in storage technology.
Configuration¶
Providers often require configuration settings to function correctly, such as API keys, service endpoints, timeouts, or feature flags. The SLOP Server provides mechanisms for providers to load and manage these settings.
Purpose:
- To allow customization of provider behavior without modifying code.
- To securely manage sensitive information like API keys or credentials.
- To adapt provider functionality to different environments (development, staging, production).
Common Approaches:
- Environment Variables: The most common and recommended method, especially for sensitive data or settings that vary across environments. Environment variables are easily managed in containerized deployments and CI/CD pipelines.
- Configuration Files: Less common for individual providers in the SLOP context, as the main server configuration might handle broader settings. If used, providers might load specific sections from a central configuration file managed by the server.
- Database: For dynamic configuration that needs to be updated without restarting the server (though this is less typical for initial setup parameters like API keys).
Implementation Strategy:
- Define Config Struct: Create a Go struct within the provider package to hold all its configuration parameters.
- Loading Function: Implement a function (e.g.,
LoadConfig()
) that reads values from the environment (or other sources) and populates the config struct. This function should handle defaults and validation. - Initialization: Call the
LoadConfig()
function during the provider's initialization phase (e.g., within theNewProvider
constructor or theInitialize
method). The loaded config struct can then be stored within the provider's adapter struct.
Implementation Example:
- Define Config Struct (
config.go
):package myprovider import ( "fmt" "os" "strconv" "time" ) // Config holds the provider configuration type Config struct { APIKey string APIEndpoint string Timeout time.Duration MaxRetries int } // LoadConfig loads configuration from environment variables func LoadConfig() (*Config, error) { apiKey := os.Getenv("MY_PROVIDER_API_KEY") if apiKey == "" { return nil, fmt.Errorf("MY_PROVIDER_API_KEY environment variable is required") } endpoint := os.Getenv("MY_PROVIDER_API_ENDPOINT") if endpoint == "" { endpoint = "https://api.example.com/v1" // Default value } timeoutStr := os.Getenv("MY_PROVIDER_TIMEOUT_SECONDS") timeout := 30 * time.Second // Default value if timeoutStr != "" { timeoutSec, err := strconv.Atoi(timeoutStr) if err != nil { return nil, fmt.Errorf("invalid MY_PROVIDER_TIMEOUT_SECONDS: %w", err) } timeout = time.Duration(timeoutSec) * time.Second } retriesStr := os.Getenv("MY_PROVIDER_MAX_RETRIES") maxRetries := 3 // Default value if retriesStr != "" { var err error maxRetries, err = strconv.Atoi(retriesStr) if err != nil || maxRetries < 0 { return nil, fmt.Errorf("invalid MY_PROVIDER_MAX_RETRIES: %s", retriesStr) } } return &Config{ APIKey: apiKey, APIEndpoint: endpoint, Timeout: timeout, MaxRetries: maxRetries, }, nil }
- Use in Provider (
adapter.go
):package myprovider import ( "context" "log" ) type MyProvider struct { config *Config // ... other fields like store, notifier, etc. } func NewProvider(/* dependencies */) (*MyProvider, error) { cfg, err := LoadConfig() if err != nil { return nil, fmt.Errorf("failed to load my_provider config: %w", err) } log.Println("MyProvider configuration loaded successfully.") return &MyProvider{ config: cfg, // ... initialize other fields }, nil } func (p *MyProvider) Initialize(ctx context.Context) error { // Use config values if needed during initialization log.Printf("Initializing MyProvider with endpoint: %s", p.config.APIEndpoint) // ... return nil } // Other methods can access config via p.config
Best Practices:
- Environment Variables: Prefer environment variables for configuration, especially for secrets.
- Defaults: Provide sensible default values for non-critical settings.
- Clear Naming: Use clear and specific names for environment variables (e.g.,
PROVIDERNAME_SETTING_NAME
). - Validation: Validate configuration values during loading to fail fast if settings are invalid.
- Documentation: Clearly document all required and optional configuration settings for the provider.
Provider Chaining¶
Provider chaining is a powerful pattern within the SLOP Server that allows providers to collaborate by passing information sequentially. An upstream provider might generate an event (often triggered by an external webhook or internal process), which is then consumed by a downstream provider. The downstream provider processes the data and may, in turn, emit its own events for further consumption, creating a processing pipeline.
Purpose:
- To build complex workflows by composing modular, single-purpose providers.
- To decouple different stages of a process (e.g., data ingestion, transformation, notification).
- To promote reusability of provider functionalities.
Mechanism:
Provider chaining typically relies on the interplay of Webhooks, Events, and Jobs:
- Initiation: An external event triggers a webhook handler in an upstream provider (Provider A), or an internal process in Provider A completes.
- Event Publication: Provider A publishes an internal event via the Pub/Sub system, containing relevant data or references (e.g., a resource ID).
- Event Consumption/Trigger:
- A dedicated listener or an Event Trigger configured in the system detects the event published by Provider A.
- This trigger schedules a background job for a downstream provider (Provider B).
- Job Execution: Provider B's job handler executes.
- Data Fetching (if needed): Provider B might need to fetch additional data, potentially by executing a tool exposed by Provider A (using the Provider Registry) or by accessing a shared resource (using the Resource Manager).
- Processing: Provider B performs its specific processing or transformation task.
- Further Publication (Optional): Provider B might publish its own event upon completion, allowing further chaining with Provider C, and so on.
Example: Unipile -> Email Draft Preparator
This chain processes incoming emails:
- Unipile (Provider A):
- Receives a webhook from an external email service (e.g., Gmail) indicating a new email.
- The webhook handler stores the raw payload as a resource.
- Publishes an internal event
unipile.email.received
containing references like the account ID, email ID, and the resource ID of the raw payload.
- Event Trigger:
- An event trigger is configured to listen for
unipile.email.received
. - When the event occurs, the trigger schedules a job of type
email_draft_preparator.process_email
.
- An event trigger is configured to listen for
- Email Draft Preparator (Provider B):
- The
process_email
job handler executes. - It extracts the email ID and account ID from the job parameters (originating from the event payload).
- It executes the
unipile.get_email
tool (belonging to Provider A) to fetch the full email details. - It processes the email content, potentially converting it to XML or another format.
- It stores the processed result (e.g., as a new resource).
- It publishes a new event, such as
email.draft.prepared
, containing the ID of the processed resource.
- The
Provider Chaining Flow Diagram:
graph LR
A[External Event Source] --> B(Provider A: Webhook/Process);
B -- Publishes Event --> C{Internal Pub/Sub};
C -- Event Notification --> D(Event Listener / Trigger);
D -- Schedules Job --> E{Job Manager};
E -- Executes Job --> F(Provider B: Job Handler);
F -- Optional: Executes Tool --> G(Provider A: Tool Execution);
F -- Processes Data --> F;
F -- Optional: Publishes New Event --> C;
subgraph "Example: Unipile -> Email Preparator"
direction LR
ExtEmail[External Email Service] --> UnipileWebhook{Unipile: Webhook Handler};
UnipileWebhook -- Publishes "unipile.email.received" --> PubSubBus[Pub/Sub];
PubSubBus -- Notifies --> Trigger(Event Trigger);
Trigger -- Schedules Job --> JobMgr{Job Manager};
JobMgr -- Executes Job --> EmailPrepJob(Email Preparator: process_email Job);
EmailPrepJob -- Executes "unipile.get_email" --> UnipileTool{Unipile: Tool Execution};
EmailPrepJob -- Processes Email --> EmailPrepJob;
EmailPrepJob -- Publishes "email.draft.prepared" --> PubSubBus;
end
This pattern, leveraging the core Pub/Sub and Job Manager systems while respecting Provider Autonomy, enables sophisticated, maintainable workflows within the SLOP Server.
Provider Descriptions (New Feature)¶
To aid developers and potentially LLMs in understanding and utilizing providers effectively, each provider's reference section should include a standardized description block. This block provides essential context beyond the basic tool and event listings.
Purpose:
- To offer a quick overview of the provider's function and scope.
- To clearly state any prerequisites needed before the provider can be used.
- To explain the authentication mechanisms required, if any.
- To illustrate typical usage patterns through common scenarios.
- To highlight known limitations or important considerations.
Standard Structure:
Each provider's reference section (under Provider Reference) should contain a subsection formatted as follows:
* **Overview:** A brief, one or two-sentence summary of the provider's main purpose.
* **Description:**
* **Prerequisites:** List any setup steps, external accounts, configurations (e.g., specific environment variables beyond basic API keys), or other providers that must be in place before this provider can function correctly.
* **Authentication:** Detail how the provider authenticates with external services (e.g., API Keys, OAuth2 flow initiated via a specific tool, credentials stored in config). Explain if user interaction is required for setup.
* **Common Use Cases / Scenarios:** Describe 2-3 typical ways the provider is used. For each scenario:
* Provide a brief narrative description.
* Include a sequence of API calls (using the SLOP Server API, e.g., `POST /api/tools/{tool_id}`) demonstrating the scenario. Show example JSON request bodies for the tool calls.
* Focus on the interaction with the SLOP server's tools, *not* the direct API calls to the underlying external service.
* **Limitations:** Mention any known constraints, edge cases, or aspects the provider does not cover (e.g., specific data types not supported, rate limits inherited from external services, features not yet implemented).
Example Snippet (Conceptual for a hypothetical 'Calendar' provider):
### Calendar (`calendar`)
* **Overview:** Provides tools to interact with users' external calendar services (e.g., Google Calendar, Outlook Calendar).
* **Description:**
* **Prerequisites:** Requires the `unipile` provider to be configured and the user to have connected their calendar account via Unipile.
* **Authentication:** Relies on the authentication tokens managed by the `unipile` provider. No separate authentication setup is needed specifically for the `calendar` provider itself, but the underlying Unipile connection must be active.
* **Common Use Cases / Scenarios:**
1. **List Upcoming Events:** Retrieve a list of the user's upcoming calendar events.
* Narrative: A user wants to see their schedule for the next day.
* API Call Sequence:
```
POST /api/tools/calendar.list_events
{
"account_id": "acc_12345", // Obtained via unipile.list_accounts
"start_time": "2025-05-04T00:00:00Z",
"end_time": "2025-05-05T00:00:00Z"
}
```
2. **Create a New Event:** Add a new event to the user's calendar.
* Narrative: Schedule a meeting based on a user request.
* API Call Sequence:
```
POST /api/tools/calendar.create_event
{
"account_id": "acc_12345",
"summary": "Project Sync Meeting",
"start_time": "2025-05-06T14:00:00Z",
"end_time": "2025-05-06T15:00:00Z",
"attendees": ["user@example.com", "colleague@example.com"]
}
```
* **Limitations:** Does not currently support managing recurring events or calendar sharing settings. Relies entirely on the capabilities exposed by the underlying Unipile connection.
* **Tools:**
* `calendar.list_events`: ...
* `calendar.create_event`: ...
* **Webhooks:** None.
* **Events:**
* `calendar.event.created`: Published after a new event is successfully created via the tool.
* **Jobs:** None.
* **Configuration:** None (relies on Unipile configuration).
* **Notes:** Ensure the Unipile account sync is up-to-date for accurate calendar information.
This standardized description block aims to significantly improve the usability and discoverability of provider functionalities. ```