Usage and Examples¶
🚀 Quick Start¶
This minimal example shows how to launch the Client, initialize a basic API client, and start listening for webhook events.
Go Example: Start a Webhook Listener with API Client Integration
// main.go
package main
import (
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"io/ioutil"
"log"
"net/http"
"strings"
"time"
// Adapt to your project structure
"your_project_path/internal/auth"
"your_project_path/internal/client"
"your_project_path/internal/api"
"your_project_path/internal/models"
)
// WebhookSecretKey is used to verify incoming webhook signatures.
// IMPORTANT: Load this from a secure configuration in a real application!
const WebhookSecretKey = "your-very-secret-webhook-key"
// SimpleAuthProvider is a basic example. For production, use a robust auth mechanism.
type SimpleAuthProvider struct {
token string
}
func NewSimpleAuthProvider(token string) *SimpleAuthProvider {
return &SimpleAuthProvider{token: token}
}
func (s *SimpleAuthProvider) GetAuthHeader() (string, error) {
if s.token == "" {
return "", nil // Or an error indicating no token
}
return "Bearer " + s.token, nil
}
func (s *SimpleAuthProvider) Login(credentials interface{}) error {
// In a real scenario, this would involve an API call to an auth server.
// For this example, we assume the token is pre-set or obtained externally.
log.Println("Login called on SimpleAuthProvider (mocked)")
return nil
}
// Event represents a generic event structure. Adapt as needed.
type Event struct {
Type string `json:"type"`
Payload interface{} `json:"payload"`
// Add other relevant fields like Timestamp, ID, etc.
}
// WebhookHandler processes incoming webhook requests.
func WebhookHandler(eventClient api.EventClientInterface) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// 1. Verify Signature (Basic Example)
// IMPORTANT: Implement robust signature verification based on your provider's documentation.
// This is a conceptual example and might not be suitable for all webhook providers.
// Consider timing attacks with hmac.Equal.
requestBody, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Printf("Error reading request body: %v", err)
http.Error(w, "Cannot read request body", http.StatusInternalServerError)
return
}
// It's important to re-assign r.Body after reading it if it needs to be read again.
r.Body = ioutil.NopCloser(strings.NewReader(string(requestBody))) // Or pass requestBody directly
// Example: X-Signature-256: sha256=...
// signatureHeader := r.Header.Get("X-Hub-Signature-256") // Example for GitHub
// if !verifySignature(requestBody, signatureHeader, WebhookSecretKey) {
// log.Println("Invalid webhook signature")
// http.Error(w, "Invalid signature", http.StatusUnauthorized)
// return
// }
log.Println("Webhook signature verification placeholder - implement for production.")
// 2. Decode Event
var event models.Event // Assuming models.Event is the correct structure
// if err := json.NewDecoder(r.Body).Decode(&event); err != nil { // Use requestBody if r.Body was consumed
if err := json.Unmarshal(requestBody, &event); err != nil {
log.Printf("Error decoding event: %v", err)
http.Error(w, "Invalid event format", http.StatusBadRequest)
return
}
log.Printf("Received event: Type=%s", event.Type)
// 3. Forward Event Asynchronously using the MFO API Client
go func(e models.Event) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) // Example timeout
defer cancel()
// The actual SendEvent method might take a more specific request model
// For now, we assume eventData itself is compatible or can be wrapped.
// Let's assume models.Event is the type expected by SendEvent or it's a generic map[string]interface{}
// Ensure the 'event' variable here matches the type expected by SendEvent.
// If SendEvent expects a wrapper like models.CreateEventRequest, create it here.
// For this example, we'll assume models.Event is directly usable or SendEvent is flexible.
_, err := eventClient.SendEvent(ctx, e) // Or wrap 'e' if necessary
if err != nil {
log.Printf("Error forwarding event to MFO server: %v", err)
// Implement retry logic or dead-letter queue for failed forwards if needed
} else {
log.Printf("Event %s forwarded successfully to MFO server.", e.Type)
}
}(event)
w.WriteHeader(http.StatusAccepted) // Acknowledge receipt quickly
}
}
// verifySignature is a placeholder for actual signature verification logic.
// IMPORTANT: Use a constant-time comparison for MACs (e.g., hmac.Equal).
func verifySignature(payload []byte, signatureHeader string, secret string) bool {
if signatureHeader == "" {
return false // Or true if signature is optional, depending on requirements
}
// Example for "sha256=..." format
parts := strings.SplitN(signatureHeader, "=", 2)
if len(parts) != 2 || parts[0] != "sha256" {
return false
}
mac := hmac.New(sha256.New, []byte(secret))
mac.Write(payload)
expectedMAC := hex.EncodeToString(mac.Sum(nil))
return hmac.Equal([]byte(parts[1]), []byte(expectedMAC))
}
func main() {
log.Println("Starting MFO Client Webhook Listener...")
// --- API Client Initialization ---
// 1. Initialize AuthProvider (use a real one for production)
// This token should come from a secure config or auth flow.
authProvider := NewSimpleAuthProvider("your-mfo-api-token")
// For a real application, you'd likely have:
// authService := auth.NewAuth("https://mfo.example.com/api/auth", http.DefaultClient)
// err := authService.Login("user@example.com", "securepassword")
// if err != nil { log.Fatalf("Auth login failed: %v", err) }
// authProvider = authService
// 2. Initialize BaseClient
// Ensure the MFO server URL is correct and from config.
baseClient := client.NewBaseClient(
"https://mfo.example.com/api", // MFO Server API Base URL
&http.Client{Timeout: 30 * time.Second},
authProvider,
)
// 3. Initialize Specialized Clients (e.g., EventClient)
// This assumes NewEventClient and the EventClientInterface are defined as per mfo_client_2_core_concepts.md
eventClient := api.NewEventClient(baseClient)
// You would similarly initialize other clients like ChatClient, ResourceClient, etc.
// chatClient := api.NewChatClient(baseClient)
// --- Webhook Listener Setup ---
// For a real application, load port, path, and secret key from configuration.
http.HandleFunc("/webhook", WebhookHandler(eventClient)) // Pass the initialized client
serverAddr := ":8080"
log.Printf("Listening for webhooks on %s/webhook", serverAddr)
if err := http.ListenAndServe(serverAddr, nil); err != nil {
log.Fatalf("Server failed to start: %v", err)
}
}
Run this application. You can then POST a JSON event to http://localhost:8080/webhook
. Make sure to set the X-Hub-Signature-256
header if you implement the signature check.
For example, using cURL:
curl -X POST http://localhost:8080/webhook \\
-H "Content-Type: application/json" \\
-H "X-Hub-Signature-256: sha256=YOUR_GENERATED_SIGNATURE" \\
-d '{"type": "user_created", "payload": {"id": "user123", "email": "test@example.com"}}'
YOUR_GENERATED_SIGNATURE
using the WebhookSecretKey
and the JSON payload.) This Quick Start provides a more complete, albeit still simplified, foundation. Refer to the Configuration section and other examples for more advanced setups, including loading configurations from files and robust authentication.
⚙️ Configure and Run¶
Effective configuration is key to a robust MFO Client. This involves setting up how the client communicates with the MFO server, handles webhooks, and manages resilience.
1️⃣ Client and Webhook Configuration¶
The main configuration for the client typically resides in a YAML file. This file will define parameters for the API client (like server URLs, timeouts, retry policies) and any webhook listener settings.
YAML Config Example (config.yaml
):
# Configuration for the MFO API Client communication
client:
base_url: "https://api.mfo.example.com/api" # Base URL for MFO server API endpoints
auth_base_url: "https://auth.mfo.example.com/auth" # Base URL for authentication (if different)
timeout: "30s" # Global HTTP request timeout
# API Token for SimpleAuthProvider or similar direct token auth.
# For more complex auth flows (e.g., OAuth2, JWT from login endpoint),
# you might have different fields or handle auth token retrieval dynamically.
auth_token: "your-secure-api-token" # IMPORTANT: Use environment variables or a secret manager for production!
retry:
max_attempts: 3
initial_backoff: "1s"
max_backoff: "30s"
backoff_multiplier: 2.0
circuit_breaker:
failure_threshold: 5
reset_timeout: "1m"
# Configuration for the Webhook Listener component
webhook_listener:
host: "localhost" # Host to bind the listener to
port: "8080" # Port for the webhook listener
path: "/webhook" # Path to listen for incoming webhooks
secret_key: "your-very-secret-webhook-key" # IMPORTANT: For verifying webhook signatures. Use env vars or secret manager.
# Logging configuration (optional)
logging:
level: "info" # e.g., debug, info, warn, error
format: "text" # e.g., text, json
Go Example: Reading Configuration
This Go code demonstrates how to define structs that map to the YAML configuration and a function to load it.
// config/config.go (or a similar package)
package config
import (
"os"
"time"
"gopkg.in/yaml.v3"
)
// Duration is a helper type for unmarshalling time.Duration from strings.
type Duration time.Duration
func (d *Duration) UnmarshalYAML(value *yaml.Node) error {
dur, err := time.ParseDuration(value.Value)
if err != nil {
return err
}
*d = Duration(dur)
return nil
}
// ClientConfig holds settings for the MFO API client communication.
type ClientConfig struct {
BaseURL string `yaml:"base_url"`
AuthBaseURL string `yaml:"auth_base_url,omitempty"`
AuthToken string `yaml:"auth_token,omitempty"` // Used by SimpleAuthProvider, manage securely!
Timeout Duration `yaml:"timeout"`
Retry RetryConfig `yaml:"retry"`
CircuitBreaker CBConfig `yaml:"circuit_breaker"`
}
// RetryConfig defines the retry policy.
type RetryConfig struct {
MaxAttempts int `yaml:"max_attempts"`
InitialBackoff Duration `yaml:"initial_backoff"`
MaxBackoff Duration `yaml:"max_backoff"`
BackoffMultiplier float64 `yaml:"backoff_multiplier"`
}
// CBConfig defines the circuit breaker policy.
type CBConfig struct {
FailureThreshold int `yaml:"failure_threshold"`
ResetTimeout Duration `yaml:"reset_timeout"`
}
// WebhookListenerConfig holds settings for the webhook listener.
type WebhookListenerConfig struct {
Host string `yaml:"host"`
Port string `yaml:"port"`
Path string `yaml:"path"`
SecretKey string `yaml:"secret_key"` // Manage securely!
}
// LoggingConfig holds settings for logging.
type LoggingConfig struct {
Level string `yaml:"level"`
Format string `yaml:"format"`
}
// Config is the top-level configuration structure.
type Config struct {
Client ClientConfig `yaml:"client"`
WebhookListener WebhookListenerConfig `yaml:"webhook_listener"`
Logging LoggingConfig `yaml:"logging,omitempty"`
}
// LoadConfig loads YAML configuration from the given file path.
// It also expands environment variables in the loaded data.
func LoadConfig(filePath string) (*Config, error) {
data, err := os.ReadFile(filePath)
if err != nil {
return nil, err
}
// Expand environment variables
expandedData := os.ExpandEnv(string(data))
var cfg Config
if err := yaml.Unmarshal([]byte(expandedData), &cfg); err != nil {
return nil, err
}
return &cfg, nil
}
Initializing AuthProvider with Configuration:
When you initialize your AuthProvider
(as shown in the Quick Start or the more detailed internal/auth
package), you would use values from the loaded Config
struct. For instance, if using a simple token-based auth:
// In main.go or an initialization function
// cfg, err := config.LoadConfig("config.yaml")
// if err != nil { log.Fatalf("Failed to load config: %v", err) }
// // Example for a simple token provider
// var authProvider auth.AuthProvider
// if cfg.Client.AuthToken != "" {
// authProvider = auth.NewSimpleTokenProvider(cfg.Client.AuthToken)
// } else {
// // Initialize a more complex AuthProvider, e.g., one that performs a login
// authService := auth.NewAuth(cfg.Client.AuthBaseURL, httpClient, cfg.Client.Credentials) // Simplified
// authProvider = authService
// // Potentially call authService.Login() here if needed at startup
// }
// // Then use this authProvider when creating the client.NewBaseClient
// baseClient := client.NewBaseClient(cfg.Client.BaseURL, httpClient, authProvider)
This setup provides a flexible way to manage client behavior and adapt to different MFO server environments and security requirements.
2️⃣ Running a Taskflow¶
The MFO Client can execute complex workflows defined in YAML using its Taskflow engine, which is based on go-taskflow
. This allows for local orchestration of tasks, which can include calls to the MFO server or other operations.
Taskflow YAML Example (sample_flow.yaml
):
This simple flow defines two tasks: one to send a notification and another to log an event after the notification is sent.
name: "NotifyAndLogFlow"
description: "A simple flow to send a notification and then log the event."
tasks:
- id: "send_notification_task"
name: "Send System Notification"
type: "system_notify_task" # Custom task type to be registered
config:
recipient: "admin@example.com"
message_template: "Event {{.eventName}} received for user {{.userID}}."
successors:
- "log_event_task"
- id: "log_event_task"
name: "Log Event Details"
type: "simple_log_task" # Custom task type to be registered
config:
log_level: "INFO"
log_message: "Notification sent for event {{.eventName}}, user {{.userID}}."
dependencies:
- "send_notification_task"
Go Example: Load and Execute Taskflow with FlowExecutor
This example demonstrates initializing the TaskRegistry
, registering custom task factories, creating a FlowExecutor
, and then (conceptually) loading and executing a flow definition.
// main.go (or a relevant package for taskflow execution)
package main
import (
"context" // Using standard context for MFO API calls within tasks
"fmt"
"log"
// Go Taskflow library
gtf "github.com/noneback/go-taskflow"
// Adapt to your project structure for MFO client components
"your_project_path/internal/taskflow" // For MFO-specific TaskRegistry, Builder, FlowExecutor wrappers
"your_project_path/internal/models" // For FlowDefinition, TaskFunc, custom task context if any
// "your_project_path/internal/api" // If tasks need direct access to specialized API clients
)
// --- Example Custom Task Implementations ---
// SimpleLogTaskFactory creates a task that logs a message.
func SimpleLogTaskFactory(config map[string]interface{}) (models.TaskFunc, error) {
logLevel := config["log_level"].(string)
logMessageFormat := config["log_message"].(string)
return func(ctx *gtf.FlowContext) error { // Assuming models.TaskFunc is compatible with gtf.TaskFunc
// Resolve variables from context if message is a template
// For simplicity, we directly use FlowContext API for data passing demonstration
eventName, _ := ctx.Get("eventName").(string)
userID, _ := ctx.Get("userID").(string)
// Basic templating, a real app might use text/template
logMessage := fmt.Sprintf(logMessageFormat, eventName, userID) // Simplified for example
// A more robust way would be to properly parse and execute a template with context data.
// Example using ctx.GetString for safer access with default:
// logMessage := strings.ReplaceAll(logMessageFormat, "{{.eventName}}", ctx.GetString("eventName", "<unknown_event>"))
// logMessage = strings.ReplaceAll(logMessage, "{{.userID}}", ctx.GetString("userID", "<unknown_user>"))
log.Printf("[%s] Task '%s': %s", logLevel, ctx.Task().ID(), logMessage)
ctx.Set("log_output_status", "logged successfully") // Example of setting output
return nil
}, nil
}
// SystemNotifyTaskFactory (Conceptual - simulates sending a notification)
func SystemNotifyTaskFactory(config map[string]interface{}) (models.TaskFunc, error) {
recipient := config["recipient"].(string)
messageTemplate := config["message_template"].(string)
return func(ctx *gtf.FlowContext) error {
// mfoApiClient, _ := ctx.Get("mfoApiClient").(api.ClientInterface) // Get MFO client if needed
eventName, _ := ctx.Get("eventName").(string)
userID, _ := ctx.Get("userID").(string)
// Simplified templating
message := fmt.Sprintf(messageTemplate, eventName, userID)
log.Printf("[INFO] Task '%s': Sending notification to %s: %s", ctx.Task().ID(), recipient, message)
// In a real task, this would use an MFO NotificationClient or similar.
// e.g., mfoApiClient.NotificationClient().CreateNotification(context.Background(), ...)
ctx.Set("notification_status", "sent")
return nil
}, nil
}
func main() { // Assuming this is part of a larger application setup
// 1. Initialize TaskRegistry
// The TaskRegistry maps task type strings (from YAML) to Go factory functions.
registry := taskflow.NewTaskRegistry() // Use your MFO client's TaskRegistry implementation
// 2. Register Custom Task Factories
// These are the Go functions that will be executed for the tasks defined in YAML.
err := registry.RegisterTask("simple_log_task", SimpleLogTaskFactory)
if err != nil {
log.Fatalf("Failed to register 'simple_log_task': %v", err)
}
err = registry.RegisterTask("system_notify_task", SystemNotifyTaskFactory)
if err != nil {
log.Fatalf("Failed to register 'system_notify_task': %v", err)
}
// 3. Initialize FlowExecutor
// The FlowExecutor manages the lifecycle of taskflows, including building them from definitions
// and running them with concurrency control and context management.
// It requires the registry to know how to build tasks.
// It might also take an MFO API client instance to inject into the task execution context.
// mfoApiClient := ... (initialize your main MFO API client as shown in previous sections)
concurrency := 4 // Example: Max 4 tasks can run concurrently within a single flow execution
flowExecutor := taskflow.NewFlowExecutor(registry, concurrency /*, mfoApiClient */) // Pass MFO client if tasks need it
// 4. Load Flow Definition from YAML (Conceptual)
// In a real application, you'd load the YAML file content and parse it into models.FlowDefinition.
// The taskflow.FlowLoader (from MFO client core concepts) would handle this.
// For this example, we'll assume `flowDef` is a populated *models.FlowDefinition struct.
/*
flowDef, err := taskflow.LoadFlowDefinitionFromFile("path/to/your/sample_flow.yaml")
if err != nil {
log.Fatalf("Failed to load flow definition: %v", err)
}
*/
// Placeholder FlowDefinition (normally parsed from the YAML above)
flowDef := &models.FlowDefinition{
Name: "NotifyAndLogFlow",
Tasks: []models.TaskDefinition{
{ID: "send_notification_task", Type: "system_notify_task", Config: map[string]interface{}{"recipient": "admin@example.com", "message_template": "Event {{.eventName}} received for user {{.userID}}."}, Successors: []string{"log_event_task"}},
{ID: "log_event_task", Type: "simple_log_task", Config: map[string]interface{}{"log_level": "INFO", "log_message": "Notification sent for event {{.eventName}}, user {{.userID}}."}, Dependencies: []string{"send_notification_task"}},
},
}
// 5. Prepare Initial Context for the Flow
// This context can pass initial data into the flow (e.g., trigger event details).
// Tasks can access these values and also write their outputs back to the context.
initialContext := map[string]interface{}{
"eventName": "user_signup",
"userID": "usr_12345",
// "mfoApiClient": mfoApiClient, // If injecting client directly, though FlowExecutor might handle this
}
// 6. Execute the Flow using FlowExecutor
// The FlowExecutor handles building the gtf.Flow from the definition and then running it.
// `ExecuteFlow` is a higher-level method specific to the MFO client's taskflow package.
log.Printf("Executing taskflow '%s'...", flowDef.Name)
executionOptions := >f.ExecutionOptions{EnableRecovery: true} // Example options
err = flowExecutor.ExecuteFlow(context.Background(), flowDef, initialContext, executionOptions)
if err != nil {
log.Fatalf("Taskflow '%s' execution failed: %v", flowDef.Name, err)
}
log.Printf("Taskflow '%s' executed successfully.", flowDef.Name)
// The original example `flow.Execute()` is a method on a `gtf.Flow` object.
// The `FlowExecutor` often abstracts this: it first uses a `Builder` (with the `TaskRegistry`)
// to convert the `models.FlowDefinition` into a `gtf.Flow`, and then calls `gtfExecutor.Run(flow, ...)`,
// which is similar to `flow.Execute()` but with more context control from the `gtf.Executor`.
}
Explanation:
TaskRegistry
: This is where you tell the system what Go code to run for a given tasktype
string found in your YAML. Each task type is mapped to a "factory" function that creates an executable task instance.- Task Factories (
SimpleLogTaskFactory
,SystemNotifyTaskFactory
): These are Go functions that take task configuration (from YAML) and return amodels.TaskFunc
. TheTaskFunc
is whatgo-taskflow
actually executes. It receives aFlowContext
which allows it to get/set data and access task details. FlowExecutor
: This is the MFO client's component responsible for the end-to-end process of running a taskflow. It typically:- Takes your
models.FlowDefinition
(parsed from YAML). - Uses an internal
taskflow.Builder
(which in turn uses theTaskRegistry
) to construct an executablegtf.Flow
object. - Manages the execution of this
gtf.Flow
using agtf.Executor
from thego-taskflow
library, handling concurrency, context propagation (including potentially injecting an MFO API client instance for tasks to use), and error handling.
- Takes your
flowExecutor.ExecuteFlow(...)
: This is the primary method you'd call. It encapsulates the building and running of the flow.- Data Passing: Tasks can receive initial data from the
initialContext
passed toExecuteFlow
. They can also pass data to subsequent tasks by setting values in theirFlowContext
(e.g.,ctx.Set("outputKey", value)
). Downstream tasks can then retrieve this (e.g.,ctx.Get("previousTask.outputKey")
or by having the values directly injected if the templating in task configs supports it).
This approach provides a structured and extensible way to define and run local workflows within the MFO Client.
🌀 Example 1: Simple Webhook Flow¶
This shows a basic flow from webhook reception to server forwarding using the structured MFO API client.
Diagram:
The sequence diagram illustrates the interaction: an external system posts a webhook, the MFO Client validates it (conceptually), then uses its API client (e.g., EventClient
) to forward the processed event to the MFO Server.
sequenceDiagram
participant User as External System
participant ClientApp as MFO Client Application
participant EventClient as MFO EventClient (part of ClientApp)
participant MFOServer as MFO Server
User->>+ClientApp: POST /webhook (Raw Event)
ClientApp->>ClientApp: Verify Signature & Parse Event
ClientApp->>+EventClient: Prepare MFO Event (e.g., models.Event)
EventClient->>+MFOServer: POST /api/events (Authenticated Request with MFO Event)
MFOServer-->>-EventClient: API Response (e.g., 200 OK, EventID)
EventClient-->>-ClientApp: Forwarding Result
ClientApp-->>-User: 202 Accepted (or error if initial validation failed)
Go Code Overview:
This simplified handler demonstrates receiving a webhook and initiating the forwarding process. It assumes eventClient
is an initialized api.EventClientInterface
available to the handler (e.g., via dependency injection or closure, as shown in the Quick Start).
// Simplified Webhook Handler (part of your main application or handlers package)
// import (
// "context"
// "encoding/json"
// "log"
// "net/http"
// "time"
// "your_project_path/internal/api" // For EventClientInterface
// "your_project_path/internal/models" // For your Event struct
// )
func SimpleWebhookHandler(eventClient api.EventClientInterface) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// 1. Basic Validation & Parsing (Signature verification should also be done here)
var receivedEvent models.Event // Use your specific event structure from models
if err := json.NewDecoder(r.Body).Decode(&receivedEvent); err != nil {
http.Error(w, "Invalid event payload", http.StatusBadRequest)
log.Printf("Error decoding webhook event: %v", err)
return
}
defer r.Body.Close()
log.Printf("Received webhook for event type: %s", receivedEvent.Type)
// 2. Asynchronously forward the event using the structured API client
go func(eventToForward models.Event) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) // Example timeout
defer cancel()
// The eventClient.SendEvent method handles the actual HTTP POST to the MFO server,
// including authentication (via the underlying BaseClient) and error handling.
// The `eventToForward` should match the expected input for `SendEvent`.
response, err := eventClient.SendEvent(ctx, eventToForward)
if err != nil {
log.Printf("Error forwarding event via EventClient: %v", err)
// Add more robust error handling: retry, dead-letter queue, etc.
} else {
log.Printf("Event forwarded successfully by EventClient. MFO Response ID (if any): %s", response.ID) // Assuming response has an ID
}
}(receivedEvent)
// 3. Acknowledge receipt quickly
w.WriteHeader(http.StatusAccepted)
fmt.Fprintln(w, "Event received and is being processed.")
}
}
// In main.go, you would set this up like:
// initialize eventClient as shown in Quick Start or Configuration sections
// http.HandleFunc("/simple-webhook", SimpleWebhookHandler(eventClient))
This example focuses on the client's role in receiving an event and handing it off to a specialized MFO API client (EventClient
) for secure and reliable forwarding. The EventClient
itself (and the BaseClient
it uses) would incorporate the authentication, error handling, and retry logic discussed in Core Concepts.
🧩 Example 2: Complex Workflow (with Subflows)¶
Scenario: A complex event, once received by the MFO Client, needs to trigger a locally orchestrated workflow involving multiple tasks with dependencies. Some of these tasks might need to interact with the MFO server (e.g., to fetch additional data, update a resource, or send a chat message) using the configured MFO API client.
Taskflow YAML (data_pipeline_flow.yaml
):
This YAML defines a multi-step data ingestion pipeline.
name: "DataIngestionPipeline"
description: "A workflow to fetch, transform, store data, and notify."
tasks:
- id: "fetch_external_data"
name: "Fetch Data from External API"
type: "http_get_task" # Custom task type for making HTTP GET requests
config:
url: "https://api.external.com/data/{{.input.dataID}}" # URL can be templated from flow context
timeout: "10s"
successors:
- "transform_fetched_data"
- id: "transform_fetched_data"
name: "Transform Raw Data"
type: "data_transform_task" # Custom task type for data manipulation
dependencies: ["fetch_external_data"]
config:
transformation_rule: "ruleset_v1"
# Input data from fetch_external_data.result (or similar) will be implicitly available
# or explicitly mapped by the task implementation via FlowContext.
successors:
- "store_data_mfo"
- id: "store_data_mfo"
name: "Store Processed Data in MFO Resource"
type: "mfo_resource_create_task" # Task type that uses the MFO API Client
dependencies: ["transform_fetched_data"]
config:
resource_type: "processed_data"
# Data to store comes from transform_fetched_data.output
# Task implementation will use FlowContext to get this data and the MFO client.
successors:
- "notify_completion_mfo"
- id: "notify_completion_mfo"
name: "Notify Team via MFO Chat"
type: "mfo_chat_send_task" # Task type that uses the MFO API Client
dependencies: ["store_data_mfo"]
config:
thread_id: "team_notifications_thread"
message_template: "Data ingestion pipeline for {{.input.dataID}} completed. New resource ID: {{.store_data_mfo.resourceID}}"
# Task implementation will use FlowContext and MFO client.
Diagram:
Illustrates the flow of tasks in the pipeline.
graph TD
A[Webhook Trigger: New Data Available] --> B(Run DataIngestionPipeline Flow);
subgraph DataIngestionPipeline Flow
direction LR
T1[fetch_external_data] --> T2[transform_fetched_data];
T2 --> T3[store_data_mfo (Uses MFO Client)];
T3 --> T4[notify_completion_mfo (Uses MFO Client)];
end
Go Code Overview and API Client Access in Tasks:
The execution of this taskflow would be managed by the FlowExecutor
as shown in the "Running a Taskflow" example. The key aspect here is how tasks like mfo_resource_create_task
or mfo_chat_send_task
access the MFO API client.
-
Client Injection by
FlowExecutor
: When theFlowExecutor
is initialized, it should be provided with a fully configured MFO API client (e.g., an instance ofBaseClientInterface
or a more specific composite client struct containing all specialized clients likeChatClient
,ResourceClient
, etc.).// Conceptual: Initializing FlowExecutor with an MFO API Client // mfoBaseClient := client.NewBaseClient(cfg.Client.BaseURL, httpClient, authProvider) // mfoChatClient := api.NewChatClient(mfoBaseClient) // mfoResourceClient := api.NewResourceClient(mfoBaseClient) // ... other clients ... // taskflowRegistry := taskflow.NewTaskRegistry() // ... register task factories ... // The FlowExecutor can store the MFO client(s) and add them to the FlowContext // for each flow execution, or the factories themselves can close over the client. // flowExecutor := taskflow.NewFlowExecutor(taskflowRegistry, concurrency, mfoBaseClient /* or a struct of clients */)
-
Accessing Client in Task Factories: The task factory for an MFO-interacting task (e.g.,
MfoResourceCreateTaskFactory
) would then retrieve the API client from thegtf.FlowContext
during task execution.// Conceptual: Factory for a task that creates an MFO resource func MfoResourceCreateTaskFactory(config map[string]interface{}) (models.TaskFunc, error) { resourceType := config["resource_type"].(string) return func(ctx *gtf.FlowContext) error { // Retrieve the MFO ResourceClient from the flow context // The key "mfoResourceClient" must match how FlowExecutor provides it. resourceClientVal, ok := ctx.Get("mfoResourceClient") if !ok { log.Printf("Task '%s': MFO ResourceClient not found in context", ctx.Task().ID()) return fmt.Errorf("mfoResourceClient not found in flow context") } mfoResourceClient := resourceClientVal.(api.ResourceClientInterface) // Type assertion // Get data to store from a previous task (example) transformedData, _ := ctx.Get("transform_fetched_data.output").(string) // Key format depends on actual output keys req := models.CreateResourceRequest{ Type: resourceType, Content: transformedData, // Metadata: ... } log.Printf("Task '%s': Creating MFO resource...", ctx.Task().ID()) newResource, err := mfoResourceClient.CreateResource(context.Background(), req) if err != nil { log.Printf("Task '%s': Failed to create MFO resource: %v", ctx.Task().ID(), err) return err // Propagate error to the taskflow engine } log.Printf("Task '%s': MFO resource created with ID: %s", ctx.Task().ID(), newResource.ID) ctx.Set("resourceID", newResource.ID) // Make the new resource ID available to subsequent tasks return nil }, nil }
-
Registration and Execution: This factory (
MfoResourceCreateTaskFactory
) would be registered with theTaskRegistry
against the typemfo_resource_create_task
. The overall flow execution (flowExecutor.ExecuteFlow(...)
) remains the same as previously described.
// Conceptual main execution snippet (building on previous examples)
// func main() {
// // ... (config loading, MFO client initialization, registry setup) ...
// registry.RegisterTask("http_get_task", HttpGetTaskFactory) // Assuming HttpGetTaskFactory is defined
// registry.RegisterTask("data_transform_task", DataTransformTaskFactory) // Assuming DataTransformTaskFactory is defined
// registry.RegisterTask("mfo_resource_create_task", MfoResourceCreateTaskFactory)
// registry.RegisterTask("mfo_chat_send_task", MfoChatSendTaskFactory) // Assuming MfoChatSendTaskFactory is defined
// flowExecutor := taskflow.NewFlowExecutor(registry, 4, mfoClients) // mfoClients could be a struct with all needed clients
// flowDef, err := taskflow.LoadFlowDefinitionFromFile("data_pipeline_flow.yaml")
// if err != nil { log.Fatalf("Failed to load flow: %v", err) }
// initialData := map[string]interface{}{"input": map[string]string{"dataID": "some-data-id-123"}}
// err = flowExecutor.ExecuteFlow(context.Background(), flowDef, initialData, nil)
// if err != nil { log.Printf("Workflow execution error: %v", err) }
// }
This example illustrates how the MFO Client's taskflow engine can orchestrate complex sequences of operations, including tasks that interact with the MFO server via its structured API client. The key is the proper setup of the TaskRegistry
with task factories that can access the API client through the FlowContext
provided by the FlowExecutor
.
Advanced: Using Specialized API Clients in Taskflows¶
While the MFO Client's primary role might be event listening and forwarding, its local taskflow engine can execute more complex workflows. Tasks within these flows might need to interact with the MFO server using various specialized API clients beyond just forwarding events. These clients are typically initialized using a shared BaseClient
instance, which would be configured with authentication and error handling policies.
Assume you have access to an initialized baseClient api.BaseClientInterface
within your task execution context.
1. Using the ResourceClient
¶
The ResourceClient
allows tasks to manage resources (e.g., documents, data objects) on the MFO server.
Example: Creating a Resource
import (
"context"
"log"
"time"
"your_mfo_client_project/internal/api" // Where NewResourceClient and ResourceClientInterface are
"your_mfo_client_project/internal/models" // Where MFO server data models are defined
)
func CreateNewResource(ctx context.Context, baseClient api.BaseClientInterface, resourceType, content string) (*models.Resource, error) {
resourceClient := api.NewResourceClient(baseClient)
req := models.CreateResourceRequest{
Type: resourceType,
Content: content,
Metadata: map[string]interface{}{
"created_by": "mfo-client-taskflow",
"timestamp": time.Now().UTC().Format(time.RFC3339),
},
}
log.Printf("Attempting to create resource of type '%s'...", resourceType)
resource, err := resourceClient.CreateResource(ctx, req)
if err != nil {
log.Printf("Failed to create resource: %v", err)
// Handle error appropriately (e.g., check for *models.APIError)
return nil, err
}
log.Printf("Successfully created resource with ID: %s", resource.ID)
return resource, nil
}
/*
// How you might call this within a task (conceptual):
func MyTask(flowCtx *taskflow.FlowContext) {
baseClient := flowCtx.Get("mfoBaseClient").(api.BaseClientInterface) // Assuming BaseClient is in context
ctx, cancel := context.WithTimeout(flowCtx.Context(), 30*time.Second)
defer cancel()
_, err := CreateNewResource(ctx, baseClient, "document", "This is some important content.")
if err != nil {
// Task-level error handling
}
}
*/
ResourceClient
and using it to create a new resource on the MFO server. The request includes the resource type, content, and some metadata. 2. Using the ChatClient
¶
The ChatClient
can be used to send messages to threads or interact with chat functionalities on the MFO server.
Example: Sending a Chat Message
import (
"context"
"log"
"time"
"your_mfo_client_project/internal/api"
"your_mfo_client_project/internal/models"
)
func SendChatMessage(ctx context.Context, baseClient api.BaseClientInterface, threadID, messageContent string) (*models.ChatResponse, error) {
chatClient := api.NewChatClient(baseClient)
req := models.ChatRequest{
ThreadID: threadID,
Message: messageContent,
Metadata: map[string]interface{}{
"source": "mfo-client-taskflow",
"sent_at": time.Now().UTC().Format(time.RFC3339),
},
}
log.Printf("Attempting to send message to thread '%s'...", threadID)
chatResponse, err := chatClient.SendMessage(ctx, req)
if err != nil {
log.Printf("Failed to send chat message: %v", err)
return nil, err
}
log.Printf("Successfully sent message. Message ID: %s", chatResponse.MessageID)
return chatResponse, nil
}
/*
// How you might call this within a task (conceptual):
func MyNotifyTask(flowCtx *taskflow.FlowContext) {
baseClient := flowCtx.Get("mfoBaseClient").(api.BaseClientInterface)
ctx, cancel := context.WithTimeout(flowCtx.Context(), 15*time.Second)
defer cancel()
_, err := SendChatMessage(ctx, baseClient, "support-thread-123", "Task X has completed successfully.")
if err != nil {
// Task-level error handling
}
}
*/
ChatClient
. 3. Using the JobClient
¶
The JobClient
allows interaction with the job system on the MFO server, for example, to create new jobs or check the status of existing ones.
Example: Creating a Job
import (
"context"
"log"
"time"
"your_mfo_client_project/internal/api"
"your_mfo_client_project/internal/models"
)
func CreateNewJob(ctx context.Context, baseClient api.BaseClientInterface, jobType string, inputData interface{}) (*models.Job, error) {
jobClient := api.NewJobClient(baseClient)
req := models.CreateJobRequest{
Type: jobType,
Input: inputData,
Metadata: map[string]interface{}{
"initiated_by": "mfo-client-taskflow",
"request_time": time.Now().UTC().Format(time.RFC3339),
},
}
log.Printf("Attempting to create job of type '%s'...", jobType)
job, err := jobClient.CreateJob(ctx, req)
if err != nil {
log.Printf("Failed to create job: %v", err)
return nil, err
}
log.Printf("Successfully created job with ID: %s, Status: %s", job.ID, job.Status)
return job, nil
}
/*
// How you might call this within a task (conceptual):
func MyProcessingTask(flowCtx *taskflow.FlowContext) {
baseClient := flowCtx.Get("mfoBaseClient").(api.BaseClientInterface)
ctx, cancel := context.WithTimeout(flowCtx.Context(), 60*time.Second) // Longer timeout for job creation
defer cancel()
jobInput := map[string]interface{}{"data_url": "s3://bucket/path/to/data.csv", "param": "value"};
_, err := CreateNewJob(ctx, baseClient, "data-processing-job", jobInput)
if err != nil {
// Task-level error handling
}
}
*/
These examples provide a starting point for integrating various MFO server interactions directly into your mfo-client
's taskflows. Remember to handle errors robustly and manage context (e.g., timeouts, cancellation) appropriately within your tasks.
🎥 Logs & Monitoring¶
Metaphor: The CCTV System – Every action is monitored and logged for transparency and debugging.
Go Code: Logging Events
log.Printf("Received event: %+v", event)
err := forwardEvent(event)
if err != nil {
log.Printf("Forwarding failed: %v", err)
} else {
log.Println("Event successfully forwarded to MFO server")
}
Typical Logs:
2025/05/09 12:00:00 Received event: {Type: user_signup Data: {...}}
2025/05/09 12:00:00 Event successfully forwarded to MFO server
Use tools like Prometheus or Grafana for deeper monitoring of request counts, failures, and latencies.
By following these examples, you'll have a fully functional Client setup, from simple webhook forwarding to complex, orchestrated workflows.