Core Concepts for Providers¶
Provider Architecture¶
Providers in the SLOP Server are designed as pluggable, autonomous modules that extend the server's core functionality. They act as bridges to external services or implement specific internal capabilities.
Key Characteristics:¶
- Modularity: Each provider encapsulates a specific domain or service integration.
- Autonomy: Providers manage their own resources and state with minimal dependence on other providers (See Provider Autonomy).
- Registration: Providers register themselves with the SLOP server during initialization, making their capabilities discoverable.
- Interface Implementation: Providers must implement the
providers.Provider
interface defined ininternal/providers/interfaces.go
. - Capabilities: Providers expose functionalities through:
- Tools: Discrete functions callable via the API or by LLMs.
- Webhooks: Endpoints to receive notifications from external services.
- Events: Publishing internal messages via a Pub/Sub system.
- Jobs: Defining and executing background tasks.
Typical Structure:¶
A provider package (pkg/providers/<provider_name>/
) usually contains:
adapter.go
: The core implementation of theproviders.Provider
interface, handling registration and interaction with the SLOP server.client.go
: (Optional) Code for interacting with external APIs or services.models/
: (Optional) Go structs defining data structures specific to the provider.services/
: (Optional) Define all services with the tools capabilities associated to each one of them .operations.go
/services/
: (Optional) Implementation of the provider's core logic and tool functions.*_webhook_handler.go
: (Optional) Handlers for incoming webhooks.*_job.go
: (Optional) Definitions and handlers for background jobs.store.go
/pg_store.go
: (Optional) Interface and implementation for provider-specific data persistence.config.go
: (Optional) Loading and managing provider-specific configuration.
graph TD
subgraph SLOP Server Core
direction LR
A[API Layer] --> B{Provider Registry};
B --> C[Tool Registry];
B --> D[Webhook Registry];
B --> E[Job Manager];
B --> F[Pub/Sub System];
B --> G[Resource Manager];
end
subgraph Provider Module (pkg/providers/my_provider)
direction TB
H(adapter.go) -- Implements --> I{providers.Provider Interface};
H -- Registers --> C;
H -- Registers --> D;
H -- Registers --> E;
H -- Publishes --> F;
H -- Uses --> G;
H -- Uses --> J[client.go];
H -- Uses --> K[operations/services];
H -- Uses --> L[store.go];
H -- Uses --> M[config.go];
J -- Interacts --> N[External Service API];
end
I -- Defines Methods --> H;
A -- Executes Tool --> H;
N -- Sends Webhook --> D;
F -- Notifies --> E;
This architecture promotes separation of concerns and allows the SLOP server to be extended without modifying its core components.*
Choose a unique_name following the go convention,so package naming conventions follow specific rules.
For your pkg/provider/
All lowercase
A single word (no underscores or hyphens)
Short and concise, but clear and descriptive
Not containing spaces or special characters
Valid characters for Go package names include:
Lowercase letters (a-z)
Numbers (0-9), but the name cannot start with a number
The underscore (_) is technically allowed but discouraged in standard Go style
Examples of valid package names for your structure:
pkg/provider/aws
pkg/provider/gcp
pkg/provider/azure
pkg/provider/mongo
pkg/provider/redis
pkg/provider/postgres
pkg/provider/auth0
Examples of invalid package names:
pkg/provider/AWS (uppercase not allowed)
pkg/provider/aws-client (hyphens not allowed)
pkg/provider/aws_client (underscores technically allowed but discouraged)
pkg/provider/aws.client (periods not allowed)
pkg/provider/1redis (cannot start with a number)
Provider Autonomy¶
Maintaining autonomy between providers is a core principle in the SLOP Server architecture. This ensures that providers are decoupled, modular, and easier to maintain and update independently.
Guideline: Provider packages (e.g., pkg/providers/unipile
) MUST remain autonomous and decoupled from one another.
Core Principles:¶
- When modifying(editing/creating) a provider package
pkg/providers/A
, you can only modify the directory pkg/providers/A. You cannot modify outside that scope especialy internal/. You must use the existing interfaces and structure provided - No Direct Cross-Provider Imports: A provider package (e.g.,
pkg/providers/A
) MUST NOT directly import another specific provider package (e.g.,pkg/providers/B
) except models pkg/providers/B/models. Within a package pkg/providers/B/models cannot import any pkg/providers/B/ or pkg/providers/... - No Direct Cross-Provider Calls: A provider package MUST NOT directly call functions or methods defined in another specific provider package.
- Communication via Abstraction: Interaction between providers or triggering actions in one provider based on an event in another MUST occur through abstracted mechanisms:
- Pub/Sub: Providers should publish generic events (e.g.,
providerA.entity.created
) to a central pub/sub system. Other components or dedicated listeners (potentially outside the provider packages) can subscribe to these events and trigger subsequent actions (like creating a job for another provider). - Job Manager: Workflow orchestration should be handled by creating jobs via the central
jobmanager
, triggered by event listeners or external orchestrators, not directly by another provider. - Resource Manager: Providers can interact with shared data by creating/reading resources via the central
resources
manager. Event payloads can reference resource IDs.
- Pub/Sub: Providers should publish generic events (e.g.,
- Shared Models/Interfaces: Providers MAY import shared, generic packages like internal core types, interfaces (
internal/providers
,internal/core
,internal/jobmanager
,internal/resources
,internal/pubsub
), or common model definitions if necessary, but NOT specific implementations of other providers.
Rationale:¶
- Reduced Complexity: Prevents tangled dependencies and makes the system easier to understand.
- Improved Maintainability: Changes within one provider are less likely to break others.
- Enhanced Testability: Providers can be tested in isolation more effectively.
- Flexibility: Allows providers to be added, removed, or replaced with minimal impact on the rest of the system.
Example Violation (Incorrect):
// pkg/providers/providerA/adapter.go
package providerA
import (
// ... other imports
providerB "gitlab.com/webigniter/slop-server/pkg/providers/providerB" // <-- VIOLATION: Direct import
// valid
BModels "gitlab.com/webigniter/slop-server/pkg/providers/providerB/models" // <-- VALID only if models are a standalone package
)
func (a *AdapterA) HandleSomeEvent(...) error {
// ... process event ...
// Directly call a function in Provider B
err := providerB.DoSomethingSpecific(...) // <-- VIOLATION: Direct call
// ...
}
Example Correct Implementation (Using Pub/Sub and Jobs):
// pkg/providers/providerA/adapter.go
package providerA
import (
"gitlab.com/webigniter/slop-server/internal/pubsub"
// NO import for providerB
)
const EventProviderACompleted = "providerA.task.completed"
func (a *AdapterA) HandleSomeEvent(ctx context.Context, ...) error {
// ... process event ...
resultData := map[string]interface{}{ "key": "value" }
// Publish a generic event
if err := a.notifier.Publish(ctx, EventProviderACompleted, resultData); err != nil {
log.Printf("ERROR publishing event %s: %v", EventProviderACompleted, err)
}
// DO NOT call Provider B directly.
return nil
}
// --- Elsewhere (e.g., internal/listeners/listeners.go or via Event Trigger Config) ---
import (
"gitlab.com/webigniter/slop-server/internal/jobmanager"
// Potentially import providerB's job registration details if needed
)
func handleProviderACompletion(ctx context.Context, eventPayload map[string]interface{}) {
// Extract necessary info from eventPayload
// Get Job Manager instance (dependency injection)
jobMgr := getJobManagerFromContext(ctx)
// Schedule a job defined by Provider B
jobParams := map[string]interface{}{ /* derived from eventPayload */ }
job := jobmanager.Job{ Type: "providerB.process_data", Parameters: jobParams }
_, err := jobMgr.ScheduleJob(ctx, job)
if err != nil { /* ... handle error ... */ }
}
By adhering to these principles, the SLOP Server maintains a clean, scalable, and robust architecture.
Provider Registration¶
For a provider to be recognized and utilized by the SLOP Server, it must be registered during the server's initialization phase. This process makes the provider's capabilities (tools, webhooks, jobs, events) available to the rest of the system.
Mechanism:
- Implementation: The provider must implement the
providers.Provider
interface. - Instantiation: A factory function (e.g.,
NewProvider()
) is typically defined within the provider's package to create an instance of the provider's main struct (adapter). - Registration Call: The core of the registration happens in the
pkg/providers/providers.go
file. Aninit()
function within this package callsproviders.RegisterProvider()
for each provider that should be included in the server build.
Steps for Developers:
- Create Provider Instance: Ensure your provider package has a constructor function (e.g.,
func NewMyProvider() *MyProvider
). - Special client providers pkg/providers/client. the client providers scope declare special tools use by the client internaly by managed (called like a tool within a function call) but instead of calling the SLOP server api like any other tools , it call a local function like ask_user to interfact with the user halt a workflow for more user information
Initialization Sequence:
When the SLOP server starts:
- the Server run GetAllProviders() the initiliaze all providers.
- The central
providers.Registry
stores instances of each registered provider. - During server setup, the core system iterates through the registered providers:
- Calls the
Initialize()
method on each provider. - Calls
RegisterTools()
,RegisterWebhooks()
,RegisterJobs()
,RegisterEventDefinitions()
(if implemented) on each provider, passing the respective registries (ToolRegistry
,WebhookRegistry
,JobRegistry
,EventRegistry
).
- Calls the
This ensures that all provider capabilities are properly configured and made available before the server starts accepting requests.
Tools¶
Tools are discrete functions exposed by providers that allow users, LLMs, or other system components to interact with the provider's capabilities. They form a primary mechanism for extending SLOP server functionality.
Purpose:
- To provide specific, callable actions related to the provider's domain (e.g.,
unipile.get_email
,filesystem.write_file
). - To enable integration with LLMs, which can select and execute tools based on user requests.
- To offer a structured way to interact with provider services via the SLOP API.
Definition:
Tools are defined within the provider using the providers.Tool
struct. Key fields include:
ID
: A unique identifier for the tool, typically following the format{ProviderName}.{tool_name}
(e.g.,my_provider.example_tool
).Name
: A human-readable name for the tool.Description
: A clear explanation of what the tool does, intended for both developers and LLMs.Provider
: The name of the provider offering this tool.Parameters
: Holds the definition of the parameters the tool accepts. This field is defined in theproviders.ProviderCapability
struct asmap[string]providers.Parameter
. Each key is the parameter name, and the value is aproviders.Parameter
struct detailing its type, description, requirement status, etc. The provider constructs this map within itsGetCapabilities()
method.
Tool Parameters - Actual Structure:
The Parameters
field is defined as map[string]providers.Parameter
within the providers.ProviderCapability
struct. This map holds the definitions for each parameter the tool accepts. The provider constructs this map within its GetCapabilities()
method for each capability.
Example Parameter Definition (using map[string]providers.Parameter
):
When defining a capability internally, the provider constructs the Parameters
map like this, using the providers.Parameter
struct:
// Example within a provider's GetCapabilities() method
func (s *SomeService) GetCapabilities() []providers.ProviderCapability {
return []providers.ProviderCapability{
{
Name: "example_tool",
Description: "An example tool description.",
Category: "Example Category",
Parameters: map[string]providers.Parameter{
"param1": {
Type: "string",
Description: "Description of the first parameter.",
Required: true,
Category: "path", // or "query", "body", etc.
},
"param2": {
Type: "integer",
Description: "Description of the second parameter (optional).",
Required: false,
Category: "query",
},
// Add more parameters as needed
},
// Examples: ... (Optional)
},
// ... other capabilities
}
}
This map (or a similar structure) is then assigned to the Parameters
field of the providers.Tool
struct when the tool is registered.
Registration:
Providers register their tools with the central ToolRegistry
during the server initialization sequence by implementing the RegisterTools
method:
// Example Registration using the actual pattern (based on unipile)
func (p *MyProvider) RegisterTools(registry providers.ToolRegistry) error {
// Define the parameter schema (e.g., as a map)
exampleToolParams := map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"param1": map[string]interface{}{
"type": "string",
"description": "First parameter",
},
},
"required": []string{"param1"},
}
// Register the tool, assigning the schema map to the Parameters field
err := registry.RegisterTool(providers.Tool{
ID: fmt.Sprintf("%s.example_tool", ProviderName),
Name: "Example Tool",
Description: "An example tool demonstrating correct parameter definition.",
Provider: ProviderName,
Parameters: exampleToolParams, // Assign the map directly
// Examples: []interface{}{...} // Optionally add examples
})
if err != nil {
return fmt.Errorf("failed to register tool: %w", err)
}
// ... register other tools ...
return nil
}
Execution:
Tool execution is handled by the provider's ExecuteTool
method. The SLOP server routes execution requests to the appropriate provider based on the tool ID.
// Example from Provider Development Guide
func (p *MyProvider) ExecuteTool(ctx context.Context, toolID string, params map[string]interface{}) (interface{}, error) {
switch toolID {
case fmt.Sprintf("%s.example_tool", ProviderName):
// Extract and validate parameters from the 'params' map
param1, ok := params["param1"].(string)
if !ok { /* handle error */ }
// ... implement tool logic ...
result := map[string]interface{}{ "message": "Success!" }
return result, nil
default:
return nil, fmt.Errorf("unknown tool ID: %s", toolID)
}
}
API Access:
- Discovery (Public):
GET /tools
: Lists all registered tools.GET /tools/:tool_id
: Retrieves the detailed definition (including parameters/schema) of a specific tool.
- Execution (Private - Requires Auth):
POST /api/tools/:tool_id
: Executes the specified tool with provided arguments in the request body.
Tools are fundamental building blocks for provider functionality, enabling both automated processes and interactive use cases within the SLOP ecosystem.
Webhooks (Incoming)¶
Webhooks provide a mechanism for external services to send asynchronous notifications or data to the SLOP Server. Providers can register specific endpoints to listen for these incoming requests.
Purpose:
- To allow external systems (e.g., SaaS platforms, Git repositories, payment gateways) to push information to SLOP in real-time or near real-time.
- To trigger internal workflows or data processing based on external events.
Mechanism:
- External Service Configuration: The external service is configured to send HTTP POST requests to a specific URL provided by the SLOP server.
- Provider Implementation: The provider implements the
providers.WebhookRegisteringProvider
interface. - Registration: During initialization, the provider registers a specific path (e.g.,
/webhooks/{provider}/{webhook_id}
) and a corresponding handler function using theproviders.WebhookRegistry
. - Handling: When an HTTP request hits a registered webhook path, the SLOP server routes it to the provider's designated handler function.
- Processing: The handler function is responsible for:
- Reading and parsing the request body (often JSON, but can vary).
- Validating the payload (and potentially verifying signatures if implemented).
- Performing minimal immediate processing.
- Crucially: For any non-trivial processing, the handler should schedule a background job using the
JobManager
to avoid blocking the webhook response and ensure reliability. The job can be passed necessary information (like the payload or a reference to it). - Returning an appropriate HTTP status code quickly (e.g.,
200 OK
for immediate success,202 Accepted
if a job was scheduled).
Implementation (WebhookRegisteringProvider
Interface): TODO to remove
Webhook Flow Diagram:
sequenceDiagram
participant ExternalService
participant SLOP_Server
participant ProviderWebhookHandler
participant JobManager
participant BackgroundJob
ExternalService->>+SLOP_Server: POST /webhooks/{provider}/{webhook_id} (Payload)
SLOP_Server->>+ProviderWebhookHandler: Invoke handler(request)
ProviderWebhookHandler->>ProviderWebhookHandler: Validate/Parse Payload
alt Minimal Processing (Not Recommended for complex tasks)
ProviderWebhookHandler-->>-SLOP_Server: 2xx OK Response
else Background Processing Needed (Recommended)
ProviderWebhookHandler->>+JobManager: ScheduleJob(JobType, Payload/PayloadRef)
JobManager-->>-ProviderWebhookHandler: JobID
ProviderWebhookHandler-->>-SLOP_Server: 202 Accepted (JobID)
JobManager->>+BackgroundJob: Execute Job(Payload/PayloadRef)
BackgroundJob->>BackgroundJob: Perform processing
BackgroundJob-->>-JobManager: Job Complete/Failed
end
SLOP_Server-->>-ExternalService: Response (2xx)
API Access:
POST /webhooks/:provider/:webhook_id
: The public endpoint where external services send webhook data. The exact path is determined during provider registration.
Webhooks are essential for integrating SLOP with external systems that initiate communication.
sequenceDiagram
participant ExternalService
participant SLOP_Server
participant ProviderWebhookHandler
participant JobManager
participant BackgroundJob
ExternalService->>+SLOP_Server: POST /webhooks/{provider}/{webhook_id} (Payload)
SLOP_Server->>+ProviderWebhookHandler: Invoke handler(request)
ProviderWebhookHandler->>ProviderWebhookHandler: Validate/Parse Payload
alt Minimal Processing
ProviderWebhookHandler-->>-SLOP_Server: 2xx OK Response
else Background Processing Needed
ProviderWebhookHandler->>+JobManager: ScheduleJob(JobType, PayloadRef)
JobManager-->>-ProviderWebhookHandler: JobID
ProviderWebhookHandler-->>-SLOP_Server: 202 Accepted (JobID)
JobManager->>+BackgroundJob: Execute Job(PayloadRef)
BackgroundJob->>BackgroundJob: Perform processing
BackgroundJob-->>-JobManager: Job Complete/Failed
end
SLOP_Server-->>-ExternalService: Response (2xx)
Events (Internal Pub/Sub)¶
Beyond receiving external webhooks, providers can publish internal events using the SLOP Server's Pub/Sub (Publish/Subscribe) system. This mechanism facilitates decoupled communication between different parts of the server, including between providers or between providers and core services.
Purpose:
- To notify other components about significant occurrences within a provider (e.g.,
unipile.email.received
,myprovider.document.processed
). - To trigger subsequent actions or workflows in a decoupled manner (See Provider Chaining).
- To enable internal automation via Event Triggers.
- To allow external clients to subscribe to server events via Webhook Subscriptions (though the client interacts with the subscription API, not the internal Pub/Sub directly).
Mechanism:
The internal Pub/Sub system relies on the pubsub.Publisher
interface (defined in internal/pubsub/interface.go
). Providers that need to publish events should receive an implementation of this interface via dependency injection during their initialization.
Workflow for Publishing Events:
- Dependency: Ensure your provider's adapter struct includes a field for
pubsub.Publisher
and receives it in its constructor. - Define Event Channel: Choose a unique, descriptive channel name, typically following the pattern
provider_name.resource_type.action
(e.g.,unipile.account.sync_status.updated
,email_draft_preparator.draft.prepared
). - Define Payload Struct: Create a Go struct representing the data for the event. This struct should be serializable to JSON and contain all necessary information for consumers.
- Publish: Call the
notifier.Publish(ctx, channelName, payloadStruct)
method where the event occurs.func (a *MyProviderAdapter) completeProcessing(ctx context.Context, docID string, url string) error { // ... processing logic ... eventPayload := MyProviderDocumentProcessedEvent{ DocumentID: docID, Status: "completed", ProcessedFileURL: url, Timestamp: time.Now(), } channel := "myprovider.document.processed" log.Printf("Publishing event '%s' for document %s", channel, docID) err := a.notifier.Publish(ctx, channel, eventPayload) if err != nil { log.Printf("ERROR publishing event '%s': %v", channel, err) // Decide how to handle publish errors (retry, log, ignore?) } return err // Or nil depending on criticality }
Payload Handling:¶
- Serialization: The payload struct passed to
Publish
must be marshallable to JSON usingencoding/json
. - Large Payloads: The system (specifically the
internal/pubsub/postgres
implementation) automatically handles large payloads. If the marshalled JSON exceeds a certain size (~7.5KB), it's transparently stored using theresources.Manager
, and only a resource reference is published internally. Providers do not need to manually check payload size before publishing.
Event Discovery:¶
To make events discoverable and usable by API clients (for webhook subscriptions or triggers), providers should implement the providers.EventInfoProvider
interface.
// Interface definition (internal/providers/interfaces.go)
type EventInfoProvider interface {
// GetEventDefinitions returns basic info about published events.
GetEventDefinitions() []EventDefinition
// GetDetailedEventInfo returns detailed info (e.g., schema, examples) about events.
GetDetailedEventInfo() []DetailedEventInfo
}
// EventDefinition describes an event published by a provider.
type EventDefinition struct {
Channel string `json:"channel"` // Unique event channel name (e.g., provider.resource.action)
Description string `json:"description"` // User-friendly description of the event
SourceProvider string `json:"source_provider"` // ID of the provider publishing the event
PayloadSchema interface{} `json:"payload_schema,omitempty"` // Optional: JSON Schema describing the payload structure
}
// DetailedEventInfo provides extended information about an event.
type DetailedEventInfo struct {
Channel string `json:"channel"`
Description string `json:"description"`
SourceProvider string `json:"source_provider"`
PayloadSchema interface{} `json:"payload_schema,omitempty"`
// ExamplePayload interface{} `json:"example_payload,omitempty"` // Optional: Example payload
// Notes string `json:"notes,omitempty"` // Optional: Additional notes
}
// Example Provider Implementation (adapter.go)
var _ providers.EventInfoProvider = (*MyProvider)(nil)
func (p *MyProvider) GetEventDefinitions() []providers.EventDefinition {
return []providers.EventDefinition{
{
Channel: "myprovider.document.processed",
Description: "Triggered when a document has finished processing.",
SourceProvider: ProviderName,
// Optionally provide a JSON schema for the payload struct
},
// ... other events published by this provider
}
}
// Example implementation for detailed info (can return nil if not needed)
func (p *MyProvider) GetDetailedEventInfo() []providers.DetailedEventInfo {
// Return nil or populate detailed info based on EventDefinitions
return nil
}
Registered event definitions can be queried via the authenticated endpoint: * GET /api/events/definitions
// TODO: Confirm endpoint or if it uses detailed info
Internal events are a powerful tool for building reactive and decoupled workflows within the SLOP Server.
Jobs (Background Processing)¶
For operations that might take a significant amount of time (e.g., processing large files, interacting with slow external APIs, complex computations), providers should utilize the SLOP Server's Job Manager to execute these tasks asynchronously in the background.
Purpose:
- Prevent Blocking: Avoid tying up API request handlers or webhook listeners while long operations complete.
- Improve Responsiveness: Allow the server to respond quickly to initial requests (e.g., with a
202 Accepted
status and a Job ID). - Enhance Reliability: Jobs can be queued, managed, and potentially retried if they fail.
- Resource Management: Allows the server to manage the execution of resource-intensive tasks more effectively.
Mechanism:¶
The system relies on a central jobmanager.JobManager
and associated interfaces:
jobmanager.JobManager
: Interface for scheduling and managing jobs.jobmanager.JobRegistry
: Interface used during initialization to register job types and their corresponding handler functions.providers.JobRegisteringProvider
: Interface that providers implement to register their job types.jobmanager.Job
: Struct representing a job instance, containing its type, parameters, schedule, status, etc.
Workflow:¶
- Define Job Type: Choose a unique string identifier for the job type, typically
provider_name.job_action
(e.g.,my_provider.process_data
,email_draft_preparator.process_email
). - Implement Handler: Create a function within the provider that performs the actual work. This function must match the signature
func(ctx context.Context, params map[string]interface{}) error
. - Register Job Type: Implement the
providers.JobRegisteringProvider
interface and use theRegisterJobs
method to associate the job type string with its handler function via theJobRegistry
. - Schedule Job: When a long-running task needs to be performed (often triggered by an API call or a webhook handler), obtain an instance of the
JobManager
(usually via context or dependency injection) and callScheduleJob(ctx, job)
. Thejobmanager.Job
struct passed to this method includes theType
,Parameters
(asmap[string]interface{}
), and desiredSchedule
(oftentime.Now()
for immediate queuing). - Execution: The Job Manager picks up queued jobs and executes their registered handler functions, passing the stored parameters.
Implementation Details:¶
- Registering Job Types (
JobRegisteringProvider
):// Interface definition (internal/providers/interfaces.go) type JobRegisteringProvider interface { RegisterJobs(registry jobmanager.JobRegistry) error } // Example Provider Implementation (adapter.go) var _ providers.JobRegisteringProvider = (*MyProvider)(nil) const JobTypeProcessData = "my_provider.process_data" func (p *MyProvider) RegisterJobs(registry jobmanager.JobRegistry) error { err := registry.RegisterJobType(JobTypeProcessData, p.processDataJob) // Link type string to handler func if err != nil { return fmt.Errorf("failed to register job %s: %w", JobTypeProcessData, err) } return nil }
- Implementing Job Handler:
// Example Handler (e.g., jobs.go) func (p *MyProvider) processDataJob(ctx context.Context, params map[string]interface{}) error { // 1. Extract and validate parameters dataID, ok := params["data_id"].(string) if !ok { return fmt.Errorf("missing or invalid 'data_id' parameter") } // 2. Perform the long-running task log.Printf("Starting job %s for data %s", JobTypeProcessData, dataID) // ... interact with external services, process data ... time.Sleep(30 * time.Second) // Simulate work log.Printf("Finished job %s for data %s", JobTypeProcessData, dataID) // 3. Return nil on success, or an error on failure return nil }
- Scheduling a Job:
// Example within a Webhook Handler or API Handler func (p *MyProvider) handleIncomingRequest(w http.ResponseWriter, r *http.Request) { // ... parse request, get dataID ... dataID := "some-data-identifier" // Get JobManager (example: from context) jobManager, ok := r.Context().Value(jobmanager.JobManagerKey).(jobmanager.JobManager) if !ok { /* Handle error - JobManager not found */ return } // Prepare job parameters jobParams := map[string]interface{}{"data_id": dataID} // Create and schedule the job job := jobmanager.Job{ Type: JobTypeProcessData, Parameters: jobParams, Schedule: time.Now(), // Queue for immediate execution // Name, UserID etc. might be set automatically or manually } jobID, err := jobManager.ScheduleJob(r.Context(), job) if err != nil { /* Handle scheduling error, return 500 */ return } // Respond to the original request w.WriteHeader(http.StatusAccepted) fmt.Fprintf(w, `{"status":"processing_scheduled", "job_id":"%s"}`, jobID) }
API Access (Private - Requires Auth):
POST /api/jobs
: Manually create and queue a new background job.GET /api/jobs
: List background jobs (filterable by status, user).GET /api/jobs/:jobID
: Get details of a specific background job.POST /api/jobs/:jobID/cancel
: Request cancellation of a running job.GET /api/jobs/:jobID/history
: Get the status history of a specific job.
Using the Job Manager is crucial for building robust and scalable providers that handle potentially long-running tasks without impacting server performance.
Storage¶
Providers often need to persist data related to their operation, such as configuration, state, user credentials, or processed results. To maintain autonomy and testability, providers should manage their own storage through dedicated interfaces.
Purpose:
- To store provider-specific data persistently.
- To decouple the provider's logic from the specific storage backend (e.g., PostgreSQL, Redis, in-memory).
- To facilitate easier testing by allowing mock storage implementations.
Mechanism:
- Define Storage Interface: Within the provider's package, define a Go interface (e.g.,
Store
) that specifies the data access methods required by the provider (e.g.,SaveItem
,GetItem
,DeleteItem
). - Implement Interface: Create one or more concrete implementations of the storage interface. A common implementation uses PostgreSQL, often utilizing the
database/sql
package or libraries likepgx
. - Dependency Injection: The provider's adapter struct should hold an instance of the storage interface. This instance (e.g.,
*PgStore
) is typically created during server initialization (using the shared database connection pool) and injected into the provider's constructor. - Usage: The provider's methods interact with the database solely through the defined storage interface methods.
Implementation Example:¶
- Define Interface (
store.go
):package myprovider import "context" // Store defines the storage interface for MyProvider type Store interface { SaveItem(ctx context.Context, item Item) error GetItem(ctx context.Context, id string) (*Item, error) // ... other methods } // Item represents a data item type Item struct { ID string Data map[string]interface{} // ... other fields }
- Implement with PostgreSQL (
pg_store.go
):package myprovider import ( "context" "database/sql" // Or pgxpool.Pool "encoding/json" "fmt" ) var _ Store = (*PgStore)(nil) // Compile-time check type PgStore struct { db *sql.DB // Or *pgxpool.Pool } func NewPgStore(db *sql.DB) *PgStore { return &PgStore{db: db} } func (s *PgStore) SaveItem(ctx context.Context, item Item) error { dataJSON, err := json.Marshal(item.Data) if err != nil { /* handle error */ } _, err = s.db.ExecContext(ctx, `INSERT INTO my_provider_items (id, data) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET data = $2`, item.ID, dataJSON) // ... handle error ... return err } func (s *PgStore) GetItem(ctx context.Context, id string) (*Item, error) { // ... query logic using s.db.QueryRowContext ... } // ... other methods
- Inject into Adapter (
adapter.go
):package myprovider import ( "context" "database/sql" ) type MyProvider struct { store Store // Use the interface type // ... other fields } // Constructor receives the concrete store implementation func NewProvider(db *sql.DB) *MyProvider { store := NewPgStore(db) // Create the concrete store return &MyProvider{ store: store, // Inject it } } func (p *MyProvider) SomeOperation(ctx context.Context, itemID string) error { // Use the store via the interface item, err := p.store.GetItem(ctx, itemID) // ... }
Database Schema:
Providers requiring database storage are responsible for defining their necessary SQL table schemas. These migrations should be managed as part of the overall SLOP Server database migration strategy.
By abstracting storage operations behind interfaces, providers remain modular and adaptable to different deployment environments or changes in storage technology.
Configuration¶
Providers often require configuration settings to function correctly, such as API keys, service endpoints, timeouts, or feature flags. The SLOP Server provides mechanisms for providers to load and manage these settings.
Purpose:¶
- To allow customization of provider behavior without modifying code.
- To securely manage sensitive information like API keys or credentials.
- To adapt provider functionality to different environments (development, staging, production).
Common Approaches:¶
- Environment Variables: The most common and recommended method, especially for sensitive data or settings that vary across environments. Environment variables are easily managed in containerized deployments and CI/CD pipelines.
- Configuration Files: Less common for individual providers in the SLOP context, as the main server configuration might handle broader settings. If used, providers might load specific sections from a central configuration file managed by the server.
- Database: For dynamic configuration that needs to be updated without restarting the server (though this is less typical for initial setup parameters like API keys).
Implementation Strategy:¶
- Define Config Struct: Create a Go struct within the provider package to hold all its configuration parameters.
- Loading Function: Implement a function (e.g.,
LoadConfig()
) that reads values from the environment (or other sources) and populates the config struct. This function should handle defaults and validation. - Initialization: Call the
LoadConfig()
function during the provider's initialization phase (e.g., within theNewProvider
constructor or theInitialize
method). The loaded config struct can then be stored within the provider's adapter struct.
Implementation Example:¶
- Define Config Struct (
config.go
):package myprovider import ( "fmt" "os" "strconv" "time" ) // Config holds the provider configuration type Config struct { APIKey string APIEndpoint string Timeout time.Duration MaxRetries int } // LoadConfig loads configuration from environment variables func LoadConfig() (*Config, error) { apiKey := os.Getenv("MY_PROVIDER_API_KEY") if apiKey == "" { return nil, fmt.Errorf("MY_PROVIDER_API_KEY environment variable is required") } endpoint := os.Getenv("MY_PROVIDER_API_ENDPOINT") if endpoint == "" { endpoint = "https://api.example.com/v1" // Default value } timeoutStr := os.Getenv("MY_PROVIDER_TIMEOUT_SECONDS") timeout := 30 * time.Second // Default value if timeoutStr != "" { timeoutSec, err := strconv.Atoi(timeoutStr) if err != nil { return nil, fmt.Errorf("invalid MY_PROVIDER_TIMEOUT_SECONDS: %w", err) } timeout = time.Duration(timeoutSec) * time.Second } retriesStr := os.Getenv("MY_PROVIDER_MAX_RETRIES") maxRetries := 3 // Default value if retriesStr != "" { var err error maxRetries, err = strconv.Atoi(retriesStr) if err != nil || maxRetries < 0 { return nil, fmt.Errorf("invalid MY_PROVIDER_MAX_RETRIES: %s", retriesStr) } } return &Config{ APIKey: apiKey, APIEndpoint: endpoint, Timeout: timeout, MaxRetries: maxRetries, }, nil }
- Use in Provider (
adapter.go
):package myprovider import ( "context" "log" ) type MyProvider struct { config *Config // ... other fields like store, notifier, etc. } func NewProvider(/* dependencies */) (*MyProvider, error) { cfg, err := LoadConfig() if err != nil { return nil, fmt.Errorf("failed to load my_provider config: %w", err) } log.Println("MyProvider configuration loaded successfully.") return &MyProvider{ config: cfg, // ... initialize other fields }, nil } func (p *MyProvider) Initialize(ctx context.Context) error { // Use config values if needed during initialization log.Printf("Initializing MyProvider with endpoint: %s", p.config.APIEndpoint) // ... return nil } // Other methods can access config via p.config
Best Practices:¶
- Environment Variables: Prefer environment variables for configuration, especially for secrets.
- Defaults: Provide sensible default values for non-critical settings.
- Clear Naming: Use clear and specific names for environment variables (e.g.,
PROVIDERNAME_SETTING_NAME
). - Validation: Validate configuration values during loading to fail fast if settings are invalid.
- Documentation: Clearly document all required and optional configuration settings for the provider.
Provider Chaining¶
Provider chaining is a powerful pattern within the SLOP Server that allows providers to collaborate by passing information sequentially. An upstream provider might generate an event (often triggered by an external webhook or internal process), which is then consumed by a downstream provider. The downstream provider processes the data and may, in turn, emit its own events for further consumption, creating a processing pipeline.
Purpose:¶
- To build complex workflows by composing modular, single-purpose providers.
- To decouple different stages of a process (e.g., data ingestion, transformation, notification).
- To promote reusability of provider functionalities.
Mechanism:¶
Provider chaining typically relies on the interplay of Webhooks, Events, and Jobs:
- Initiation: An external event triggers a webhook handler in an upstream provider (Provider A), or an internal process in Provider A completes.
- Event Publication: Provider A publishes an internal event via the Pub/Sub system, containing relevant data or references (e.g., a resource ID).
- Event Consumption/Trigger:
- A dedicated listener or an Event Trigger configured in the system detects the event published by Provider A.
- This trigger schedules a background job for a downstream provider (Provider B).
- Job Execution: Provider B's job handler executes.
- Data Fetching (if needed): Provider B might need to fetch additional data, potentially by executing a tool exposed by Provider A (using the Provider Registry) or by accessing a shared resource (using the Resource Manager).
- Processing: Provider B performs its specific processing or transformation task.
- Further Publication (Optional): Provider B might publish its own event upon completion, allowing further chaining with Provider C, and so on.
Example: Unipile -> Email Draft Preparator¶
This chain processes incoming emails:
- Unipile (Provider A):
- Receives a webhook from an external email service (e.g., Gmail) indicating a new email.
- The webhook handler stores the raw payload as a resource.
- Publishes an internal event
unipile.email.received
containing references like the account ID, email ID, and the resource ID of the raw payload.
- Event Trigger:
- An event trigger is configured to listen for
unipile.email.received
. - When the event occurs, the trigger schedules a job of type
email_draft_preparator.process_email
.
- An event trigger is configured to listen for
- Email Draft Preparator (Provider B):
- The
process_email
job handler executes. - It extracts the email ID and account ID from the job parameters (originating from the event payload).
- It executes the
unipile.get_email
tool (belonging to Provider A) to fetch the full email details. - It processes the email content, potentially converting it to XML or another format.
- It stores the processed result (e.g., as a new resource).
- It publishes a new event, such as
email.draft.prepared
, containing the ID of the processed resource.
- The
Provider Chaining Flow Diagram:¶
graph LR
A[External Event Source] --> B(Provider A: Webhook/Process);
B -- Publishes Event --> C{Internal Pub/Sub};
C -- Event Notification --> D(Event Listener / Trigger);
D -- Schedules Job --> E{Job Manager};
E -- Executes Job --> F(Provider B: Job Handler);
F -- Optional: Executes Tool --> G(Provider A: Tool Execution);
F -- Processes Data --> F;
F -- Optional: Publishes New Event --> C;
subgraph "Example: Unipile -> Email Preparator"
direction LR
ExtEmail[External Email Service] --> UnipileWebhook{Unipile: Webhook Handler};
UnipileWebhook -- Publishes "unipile.email.received" --> PubSubBus[Pub/Sub];
PubSubBus -- Notifies --> Trigger(Event Trigger);
Trigger -- Schedules Job --> JobMgr{Job Manager};
JobMgr -- Executes Job --> EmailPrepJob(Email Preparator: process_email Job);
EmailPrepJob -- Executes "unipile.get_email" --> UnipileTool{Unipile Provider Tool};
EmailPrepJob -- Processes Email --> EmailPrepJob;
EmailPrepJob -- Publishes "email.draft.prepared" --> PubSubBus;
end
This pattern, leveraging the core Pub/Sub and Job Manager systems while respecting Provider Autonomy, enables sophisticated, maintainable workflows within the SLOP Server.
graph LR
A[External Event Source] --> B(Provider A: Webhook Listener);
B -- Publishes Event --> C{Internal Pub/Sub};
C -- Event Notification --> D(Provider B: Event Listener / Job Trigger);
D -- Processes Data --> D;
D -- Publishes New Event --> C;
C -- Event Notification --> E(Provider C: Event Listener / Job Trigger);
subgraph "Example: Unipile -> Email Preparator"
direction LR
ExtEmail[External Email Service] --> UnipileWebhook{Unipile Provider Webhook};
UnipileWebhook -- "unipile.email.received" --> PubSubBus[Pub/Sub];
PubSubBus -- Trigger --> EmailPrepJob(Email Preparator Job);
EmailPrepJob -- Fetches Email via Unipile Tool --> UnipileTool{Unipile Provider Tool};
EmailPrepJob -- Processes Email --> EmailPrepJob;
EmailPrepJob -- "email.draft.prepared" --> PubSubBus;
end
Provider Descriptions (New Feature)¶
To aid developers and potentially LLMs in understanding and utilizing providers effectively, each provider's reference section should include a standardized description block. This block provides essential context beyond the basic tool and event listings.
Purpose:
- To offer a quick overview of the provider's function and scope.
- To clearly state any prerequisites needed before the provider can be used.
- To explain the authentication mechanisms required, if any.
- To illustrate typical usage patterns through common scenarios.
- To highlight known limitations or important considerations.
Standard Structure:
Each provider's reference section (under Provider Reference) should contain a subsection formatted as follows:
* **Overview:** A brief, one or two-sentence summary of the provider's main purpose.
* **Description:**
* **Prerequisites:** List any setup steps, external accounts, configurations (e.g., specific environment variables beyond basic API keys), or other providers that must be in place before this provider can function correctly.
* **Authentication:** Detail how the provider authenticates with external services (e.g., API Keys, OAuth2 flow initiated via a specific tool, credentials stored in config). Explain if user interaction is required for setup.
* **Common Use Cases / Scenarios:** Describe 2-3 typical ways the provider is used. For each scenario:
* Provide a brief narrative description.
* Include a sequence of API calls (using the SLOP Server API, e.g., `POST /api/tools/{tool_id}`) demonstrating the scenario. Show example JSON request bodies for the tool calls.
* Focus on the interaction with the SLOP server's tools, *not* the direct API calls to the underlying external service.
* **Limitations:** Mention any known constraints, edge cases, or aspects the provider does not cover (e.g., specific data types not supported, rate limits inherited from external services, features not yet implemented).
Example Snippet (Conceptual for a hypothetical 'Calendar' provider):
### Calendar (`calendar`)
* **Overview:** Provides tools to interact with users' external calendar services (e.g., Google Calendar, Outlook Calendar).
* **Description:**
* **Prerequisites:** Requires the `unipile` provider to be configured and the user to have connected their calendar account via Unipile.
* **Authentication:** Relies on the authentication tokens managed by the `unipile` provider. No separate authentication setup is needed specifically for the `calendar` provider itself, but the underlying Unipile connection must be active.
* **Common Use Cases / Scenarios:**
1. **List Upcoming Events:** Retrieve a list of the user's upcoming calendar events.
* Narrative: A user wants to see their schedule for the next day.
* API Call Sequence:
```
POST /api/tools/calendar.list_events
{
"account_id": "acc_12345", // Obtained via unipile.list_accounts
"start_time": "2025-05-04T00:00:00Z",
"end_time": "2025-05-05T00:00:00Z"
}
```
2. **Create a New Event:** Add a new event to the user's calendar.
* Narrative: Schedule a meeting based on a user request.
* API Call Sequence:
```
POST /api/tools/calendar.create_event
{
"account_id": "acc_12345",
"summary": "Project Sync Meeting",
"start_time": "2025-05-06T14:00:00Z",
"end_time": "2025-05-06T15:00:00Z",
"attendees": ["user@example.com", "colleague@example.com"]
}
```
* **Limitations:** Does not currently support managing recurring events or calendar sharing settings. Relies entirely on the capabilities exposed by the underlying Unipile connection.
* **Tools:**
* `calendar.list_events`: ...
* `calendar.create_event`: ...
* **Webhooks:** None.
* **Events:**
* `calendar.event.created`: Published after a new event is successfully created via the tool.
* **Jobs:** None.
* **Configuration:** None (relies on Unipile configuration).
* **Notes:** Ensure the Unipile account sync is up-to-date for accurate calendar information.