Skip to content

Usage and Examples


🚀 Quick Start

This minimal example shows how to launch the Client, initialize a basic API client, and start listening for webhook events.

Go Example: Start a Webhook Listener with API Client Integration

// main.go
package main

import (
    "context"
    "crypto/hmac"
    "crypto/sha256"
    "encoding/hex"
    "encoding/json"
    "io/ioutil"
    "log"
    "net/http"
    "strings"
    "time"

    // Adapt to your project structure
    "your_project_path/internal/auth"
    "your_project_path/internal/client"
    "your_project_path/internal/api"
    "your_project_path/internal/models"
)

// WebhookSecretKey is used to verify incoming webhook signatures.
// IMPORTANT: Load this from a secure configuration in a real application!
const WebhookSecretKey = "your-very-secret-webhook-key"

// SimpleAuthProvider is a basic example. For production, use a robust auth mechanism.
type SimpleAuthProvider struct {
    token string
}

func NewSimpleAuthProvider(token string) *SimpleAuthProvider {
    return &SimpleAuthProvider{token: token}
}

func (s *SimpleAuthProvider) GetAuthHeader() (string, error) {
    if s.token == "" {
        return "", nil // Or an error indicating no token
    }
    return "Bearer " + s.token, nil
}

func (s *SimpleAuthProvider) Login(credentials interface{}) error {
    // In a real scenario, this would involve an API call to an auth server.
    // For this example, we assume the token is pre-set or obtained externally.
    log.Println("Login called on SimpleAuthProvider (mocked)")
    return nil
}


// Event represents a generic event structure. Adapt as needed.
type Event struct {
    Type    string      `json:"type"`
    Payload interface{} `json:"payload"`
    // Add other relevant fields like Timestamp, ID, etc.
}

// WebhookHandler processes incoming webhook requests.
func WebhookHandler(eventClient api.EventClientInterface) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        // 1. Verify Signature (Basic Example)
        // IMPORTANT: Implement robust signature verification based on your provider's documentation.
        // This is a conceptual example and might not be suitable for all webhook providers.
        // Consider timing attacks with hmac.Equal.
        requestBody, err := ioutil.ReadAll(r.Body)
        if err != nil {
            log.Printf("Error reading request body: %v", err)
            http.Error(w, "Cannot read request body", http.StatusInternalServerError)
            return
        }
        // It's important to re-assign r.Body after reading it if it needs to be read again.
        r.Body = ioutil.NopCloser(strings.NewReader(string(requestBody))) // Or pass requestBody directly

        // Example: X-Signature-256: sha256=...
        // signatureHeader := r.Header.Get("X-Hub-Signature-256") // Example for GitHub
        // if !verifySignature(requestBody, signatureHeader, WebhookSecretKey) {
        //  log.Println("Invalid webhook signature")
        //  http.Error(w, "Invalid signature", http.StatusUnauthorized)
        //  return
        // }
        log.Println("Webhook signature verification placeholder - implement for production.")


        // 2. Decode Event
        var event models.Event // Assuming models.Event is the correct structure
        // if err := json.NewDecoder(r.Body).Decode(&event); err != nil { // Use requestBody if r.Body was consumed
        if err := json.Unmarshal(requestBody, &event); err != nil {
            log.Printf("Error decoding event: %v", err)
            http.Error(w, "Invalid event format", http.StatusBadRequest)
            return
        }
        log.Printf("Received event: Type=%s", event.Type)


        // 3. Forward Event Asynchronously using the MFO API Client
        go func(e models.Event) {
            ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) // Example timeout
            defer cancel()

            // The actual SendEvent method might take a more specific request model
            // For now, we assume eventData itself is compatible or can be wrapped.
            // Let's assume models.Event is the type expected by SendEvent or it's a generic map[string]interface{}
            // Ensure the 'event' variable here matches the type expected by SendEvent.
            // If SendEvent expects a wrapper like models.CreateEventRequest, create it here.
            // For this example, we'll assume models.Event is directly usable or SendEvent is flexible.

            _, err := eventClient.SendEvent(ctx, e) // Or wrap 'e' if necessary
            if err != nil {
                log.Printf("Error forwarding event to MFO server: %v", err)
                // Implement retry logic or dead-letter queue for failed forwards if needed
            } else {
                log.Printf("Event %s forwarded successfully to MFO server.", e.Type)
            }
        }(event)

        w.WriteHeader(http.StatusAccepted) // Acknowledge receipt quickly
    }
}

// verifySignature is a placeholder for actual signature verification logic.
// IMPORTANT: Use a constant-time comparison for MACs (e.g., hmac.Equal).
func verifySignature(payload []byte, signatureHeader string, secret string) bool {
    if signatureHeader == "" {
        return false // Or true if signature is optional, depending on requirements
    }
    // Example for "sha256=..." format
    parts := strings.SplitN(signatureHeader, "=", 2)
    if len(parts) != 2 || parts[0] != "sha256" {
        return false
    }
    mac := hmac.New(sha256.New, []byte(secret))
    mac.Write(payload)
    expectedMAC := hex.EncodeToString(mac.Sum(nil))
    return hmac.Equal([]byte(parts[1]), []byte(expectedMAC))
}


func main() {
    log.Println("Starting MFO Client Webhook Listener...")

    // --- API Client Initialization ---
    // 1. Initialize AuthProvider (use a real one for production)
    // This token should come from a secure config or auth flow.
    authProvider := NewSimpleAuthProvider("your-mfo-api-token")
    // For a real application, you'd likely have:
    // authService := auth.NewAuth("https://mfo.example.com/api/auth", http.DefaultClient)
    // err := authService.Login("user@example.com", "securepassword")
    // if err != nil { log.Fatalf("Auth login failed: %v", err) }
    // authProvider = authService


    // 2. Initialize BaseClient
    // Ensure the MFO server URL is correct and from config.
    baseClient := client.NewBaseClient(
        "https://mfo.example.com/api", // MFO Server API Base URL
        &http.Client{Timeout: 30 * time.Second},
        authProvider,
    )

    // 3. Initialize Specialized Clients (e.g., EventClient)
    // This assumes NewEventClient and the EventClientInterface are defined as per mfo_client_2_core_concepts.md
    eventClient := api.NewEventClient(baseClient)
    // You would similarly initialize other clients like ChatClient, ResourceClient, etc.
    // chatClient := api.NewChatClient(baseClient)


    // --- Webhook Listener Setup ---
    // For a real application, load port, path, and secret key from configuration.
    http.HandleFunc("/webhook", WebhookHandler(eventClient)) // Pass the initialized client

    serverAddr := ":8080"
    log.Printf("Listening for webhooks on %s/webhook", serverAddr)
    if err := http.ListenAndServe(serverAddr, nil); err != nil {
        log.Fatalf("Server failed to start: %v", err)
    }
}

Run this application. You can then POST a JSON event to http://localhost:8080/webhook. Make sure to set the X-Hub-Signature-256 header if you implement the signature check.

For example, using cURL:

curl -X POST http://localhost:8080/webhook \\
-H "Content-Type: application/json" \\
-H "X-Hub-Signature-256: sha256=YOUR_GENERATED_SIGNATURE" \\
-d '{"type": "user_created", "payload": {"id": "user123", "email": "test@example.com"}}'
(You'll need to generate YOUR_GENERATED_SIGNATURE using the WebhookSecretKey and the JSON payload.)

This Quick Start provides a more complete, albeit still simplified, foundation. Refer to the Configuration section and other examples for more advanced setups, including loading configurations from files and robust authentication.


⚙️ Configure and Run

Effective configuration is key to a robust MFO Client. This involves setting up how the client communicates with the MFO server, handles webhooks, and manages resilience.

1️⃣ Client and Webhook Configuration

The main configuration for the client typically resides in a YAML file. This file will define parameters for the API client (like server URLs, timeouts, retry policies) and any webhook listener settings.

YAML Config Example (config.yaml):

# Configuration for the MFO API Client communication
client:
  base_url: "https://api.mfo.example.com/api" # Base URL for MFO server API endpoints
  auth_base_url: "https://auth.mfo.example.com/auth" # Base URL for authentication (if different)
  timeout: "30s"               # Global HTTP request timeout

  # API Token for SimpleAuthProvider or similar direct token auth.
  # For more complex auth flows (e.g., OAuth2, JWT from login endpoint),
  # you might have different fields or handle auth token retrieval dynamically.
  auth_token: "your-secure-api-token" # IMPORTANT: Use environment variables or a secret manager for production!

  retry:
    max_attempts: 3
    initial_backoff: "1s"
    max_backoff: "30s"
    backoff_multiplier: 2.0

  circuit_breaker:
    failure_threshold: 5
    reset_timeout: "1m"

# Configuration for the Webhook Listener component
webhook_listener:
  host: "localhost"            # Host to bind the listener to
  port: "8080"                 # Port for the webhook listener
  path: "/webhook"             # Path to listen for incoming webhooks
  secret_key: "your-very-secret-webhook-key" # IMPORTANT: For verifying webhook signatures. Use env vars or secret manager.

# Logging configuration (optional)
logging:
  level: "info" # e.g., debug, info, warn, error
  format: "text" # e.g., text, json

Go Example: Reading Configuration

This Go code demonstrates how to define structs that map to the YAML configuration and a function to load it.

// config/config.go (or a similar package)
package config

import (
    "os"
    "time"

    "gopkg.in/yaml.v3"
)

// Duration is a helper type for unmarshalling time.Duration from strings.
type Duration time.Duration

func (d *Duration) UnmarshalYAML(value *yaml.Node) error {
    dur, err := time.ParseDuration(value.Value)
    if err != nil {
        return err
    }
    *d = Duration(dur)
    return nil
}

// ClientConfig holds settings for the MFO API client communication.
type ClientConfig struct {
    BaseURL           string        `yaml:"base_url"`
    AuthBaseURL       string        `yaml:"auth_base_url,omitempty"`
    AuthToken         string        `yaml:"auth_token,omitempty"` // Used by SimpleAuthProvider, manage securely!
    Timeout           Duration      `yaml:"timeout"`
    Retry             RetryConfig   `yaml:"retry"`
    CircuitBreaker    CBConfig      `yaml:"circuit_breaker"`
}

// RetryConfig defines the retry policy.
type RetryConfig struct {
    MaxAttempts       int      `yaml:"max_attempts"`
    InitialBackoff    Duration `yaml:"initial_backoff"`
    MaxBackoff        Duration `yaml:"max_backoff"`
    BackoffMultiplier float64  `yaml:"backoff_multiplier"`
}

// CBConfig defines the circuit breaker policy.
type CBConfig struct {
    FailureThreshold  int      `yaml:"failure_threshold"`
    ResetTimeout      Duration `yaml:"reset_timeout"`
}

// WebhookListenerConfig holds settings for the webhook listener.
type WebhookListenerConfig struct {
    Host      string `yaml:"host"`
    Port      string `yaml:"port"`
    Path      string `yaml:"path"`
    SecretKey string `yaml:"secret_key"` // Manage securely!
}

// LoggingConfig holds settings for logging.
type LoggingConfig struct {
    Level  string `yaml:"level"`
    Format string `yaml:"format"`
}

// Config is the top-level configuration structure.
type Config struct {
    Client           ClientConfig          `yaml:"client"`
    WebhookListener  WebhookListenerConfig `yaml:"webhook_listener"`
    Logging          LoggingConfig         `yaml:"logging,omitempty"`
}

// LoadConfig loads YAML configuration from the given file path.
// It also expands environment variables in the loaded data.
func LoadConfig(filePath string) (*Config, error) {
    data, err := os.ReadFile(filePath)
    if err != nil {
        return nil, err
    }

    // Expand environment variables
    expandedData := os.ExpandEnv(string(data))

    var cfg Config
    if err := yaml.Unmarshal([]byte(expandedData), &cfg); err != nil {
        return nil, err
    }
    return &cfg, nil
}

Initializing AuthProvider with Configuration:

When you initialize your AuthProvider (as shown in the Quick Start or the more detailed internal/auth package), you would use values from the loaded Config struct. For instance, if using a simple token-based auth:

// In main.go or an initialization function

// cfg, err := config.LoadConfig("config.yaml")
// if err != nil { log.Fatalf("Failed to load config: %v", err) }

// // Example for a simple token provider
// var authProvider auth.AuthProvider
// if cfg.Client.AuthToken != "" {
//  authProvider = auth.NewSimpleTokenProvider(cfg.Client.AuthToken)
// } else {
//  // Initialize a more complex AuthProvider, e.g., one that performs a login
//  authService := auth.NewAuth(cfg.Client.AuthBaseURL, httpClient, cfg.Client.Credentials) // Simplified
//  authProvider = authService
//  // Potentially call authService.Login() here if needed at startup
// }

// // Then use this authProvider when creating the client.NewBaseClient
// baseClient := client.NewBaseClient(cfg.Client.BaseURL, httpClient, authProvider)

This setup provides a flexible way to manage client behavior and adapt to different MFO server environments and security requirements.


2️⃣ Running a Taskflow

The MFO Client can execute complex workflows defined in YAML using its Taskflow engine, which is based on go-taskflow. This allows for local orchestration of tasks, which can include calls to the MFO server or other operations.

Taskflow YAML Example (sample_flow.yaml):

This simple flow defines two tasks: one to send a notification and another to log an event after the notification is sent.

name: "NotifyAndLogFlow"
description: "A simple flow to send a notification and then log the event."

tasks:
  - id: "send_notification_task"
    name: "Send System Notification"
    type: "system_notify_task" # Custom task type to be registered
    config:
      recipient: "admin@example.com"
      message_template: "Event {{.eventName}} received for user {{.userID}}."
    successors:
      - "log_event_task"

  - id: "log_event_task"
    name: "Log Event Details"
    type: "simple_log_task" # Custom task type to be registered
    config:
      log_level: "INFO"
      log_message: "Notification sent for event {{.eventName}}, user {{.userID}}."
    dependencies:
      - "send_notification_task"

Go Example: Load and Execute Taskflow with FlowExecutor

This example demonstrates initializing the TaskRegistry, registering custom task factories, creating a FlowExecutor, and then (conceptually) loading and executing a flow definition.

// main.go (or a relevant package for taskflow execution)
package main

import (
    "context" // Using standard context for MFO API calls within tasks
    "fmt"
    "log"

    // Go Taskflow library
    gtf "github.com/noneback/go-taskflow"

    // Adapt to your project structure for MFO client components
    "your_project_path/internal/taskflow" // For MFO-specific TaskRegistry, Builder, FlowExecutor wrappers
    "your_project_path/internal/models"   // For FlowDefinition, TaskFunc, custom task context if any
    // "your_project_path/internal/api"      // If tasks need direct access to specialized API clients
)

// --- Example Custom Task Implementations ---

// SimpleLogTaskFactory creates a task that logs a message.
func SimpleLogTaskFactory(config map[string]interface{}) (models.TaskFunc, error) {
    logLevel := config["log_level"].(string)
    logMessageFormat := config["log_message"].(string)

    return func(ctx *gtf.FlowContext) error { // Assuming models.TaskFunc is compatible with gtf.TaskFunc
        // Resolve variables from context if message is a template
        // For simplicity, we directly use FlowContext API for data passing demonstration
        eventName, _ := ctx.Get("eventName").(string)
        userID, _ := ctx.Get("userID").(string)

        // Basic templating, a real app might use text/template
        logMessage := fmt.Sprintf(logMessageFormat, eventName, userID) // Simplified for example
        // A more robust way would be to properly parse and execute a template with context data.
        // Example using ctx.GetString for safer access with default:
        // logMessage := strings.ReplaceAll(logMessageFormat, "{{.eventName}}", ctx.GetString("eventName", "<unknown_event>"))
        // logMessage = strings.ReplaceAll(logMessage, "{{.userID}}", ctx.GetString("userID", "<unknown_user>"))


        log.Printf("[%s] Task '%s': %s", logLevel, ctx.Task().ID(), logMessage)
        ctx.Set("log_output_status", "logged successfully") // Example of setting output
        return nil
    }, nil
}

// SystemNotifyTaskFactory (Conceptual - simulates sending a notification)
func SystemNotifyTaskFactory(config map[string]interface{}) (models.TaskFunc, error) {
    recipient := config["recipient"].(string)
    messageTemplate := config["message_template"].(string)

    return func(ctx *gtf.FlowContext) error {
        // mfoApiClient, _ := ctx.Get("mfoApiClient").(api.ClientInterface) // Get MFO client if needed
        eventName, _ := ctx.Get("eventName").(string)
        userID, _ := ctx.Get("userID").(string)

        // Simplified templating
        message := fmt.Sprintf(messageTemplate, eventName, userID)

        log.Printf("[INFO] Task '%s': Sending notification to %s: %s", ctx.Task().ID(), recipient, message)
        // In a real task, this would use an MFO NotificationClient or similar.
        // e.g., mfoApiClient.NotificationClient().CreateNotification(context.Background(), ...)
        ctx.Set("notification_status", "sent")
        return nil
    }, nil
}

func main() { // Assuming this is part of a larger application setup
    // 1. Initialize TaskRegistry
    // The TaskRegistry maps task type strings (from YAML) to Go factory functions.
    registry := taskflow.NewTaskRegistry() // Use your MFO client's TaskRegistry implementation

    // 2. Register Custom Task Factories
    // These are the Go functions that will be executed for the tasks defined in YAML.
    err := registry.RegisterTask("simple_log_task", SimpleLogTaskFactory)
    if err != nil {
        log.Fatalf("Failed to register 'simple_log_task': %v", err)
    }
    err = registry.RegisterTask("system_notify_task", SystemNotifyTaskFactory)
    if err != nil {
        log.Fatalf("Failed to register 'system_notify_task': %v", err)
    }

    // 3. Initialize FlowExecutor
    // The FlowExecutor manages the lifecycle of taskflows, including building them from definitions
    // and running them with concurrency control and context management.
    // It requires the registry to know how to build tasks.
    // It might also take an MFO API client instance to inject into the task execution context.
    // mfoApiClient := ... (initialize your main MFO API client as shown in previous sections)
    concurrency := 4 // Example: Max 4 tasks can run concurrently within a single flow execution
    flowExecutor := taskflow.NewFlowExecutor(registry, concurrency /*, mfoApiClient */) // Pass MFO client if tasks need it

    // 4. Load Flow Definition from YAML (Conceptual)
    // In a real application, you'd load the YAML file content and parse it into models.FlowDefinition.
    // The taskflow.FlowLoader (from MFO client core concepts) would handle this.
    // For this example, we'll assume `flowDef` is a populated *models.FlowDefinition struct.
    /*
       flowDef, err := taskflow.LoadFlowDefinitionFromFile("path/to/your/sample_flow.yaml")
       if err != nil {
           log.Fatalf("Failed to load flow definition: %v", err)
       }
    */
    // Placeholder FlowDefinition (normally parsed from the YAML above)
    flowDef := &models.FlowDefinition{
        Name: "NotifyAndLogFlow",
        Tasks: []models.TaskDefinition{
            {ID: "send_notification_task", Type: "system_notify_task", Config: map[string]interface{}{"recipient": "admin@example.com", "message_template": "Event {{.eventName}} received for user {{.userID}}."}, Successors: []string{"log_event_task"}},
            {ID: "log_event_task", Type: "simple_log_task", Config: map[string]interface{}{"log_level": "INFO", "log_message": "Notification sent for event {{.eventName}}, user {{.userID}}."}, Dependencies: []string{"send_notification_task"}},
        },
    }

    // 5. Prepare Initial Context for the Flow
    // This context can pass initial data into the flow (e.g., trigger event details).
    // Tasks can access these values and also write their outputs back to the context.
    initialContext := map[string]interface{}{
        "eventName": "user_signup",
        "userID":    "usr_12345",
        // "mfoApiClient": mfoApiClient, // If injecting client directly, though FlowExecutor might handle this
    }

    // 6. Execute the Flow using FlowExecutor
    // The FlowExecutor handles building the gtf.Flow from the definition and then running it.
    // `ExecuteFlow` is a higher-level method specific to the MFO client's taskflow package.
    log.Printf("Executing taskflow '%s'...", flowDef.Name)
    executionOptions := &gtf.ExecutionOptions{EnableRecovery: true} // Example options
    err = flowExecutor.ExecuteFlow(context.Background(), flowDef, initialContext, executionOptions)
    if err != nil {
        log.Fatalf("Taskflow '%s' execution failed: %v", flowDef.Name, err)
    }
    log.Printf("Taskflow '%s' executed successfully.", flowDef.Name)

    // The original example `flow.Execute()` is a method on a `gtf.Flow` object.
    // The `FlowExecutor` often abstracts this: it first uses a `Builder` (with the `TaskRegistry`)
    // to convert the `models.FlowDefinition` into a `gtf.Flow`, and then calls `gtfExecutor.Run(flow, ...)`,
    // which is similar to `flow.Execute()` but with more context control from the `gtf.Executor`.
}

Explanation:

  1. TaskRegistry: This is where you tell the system what Go code to run for a given task type string found in your YAML. Each task type is mapped to a "factory" function that creates an executable task instance.
  2. Task Factories (SimpleLogTaskFactory, SystemNotifyTaskFactory): These are Go functions that take task configuration (from YAML) and return a models.TaskFunc. The TaskFunc is what go-taskflow actually executes. It receives a FlowContext which allows it to get/set data and access task details.
  3. FlowExecutor: This is the MFO client's component responsible for the end-to-end process of running a taskflow. It typically:
    • Takes your models.FlowDefinition (parsed from YAML).
    • Uses an internal taskflow.Builder (which in turn uses the TaskRegistry) to construct an executable gtf.Flow object.
    • Manages the execution of this gtf.Flow using a gtf.Executor from the go-taskflow library, handling concurrency, context propagation (including potentially injecting an MFO API client instance for tasks to use), and error handling.
  4. flowExecutor.ExecuteFlow(...): This is the primary method you'd call. It encapsulates the building and running of the flow.
  5. Data Passing: Tasks can receive initial data from the initialContext passed to ExecuteFlow. They can also pass data to subsequent tasks by setting values in their FlowContext (e.g., ctx.Set("outputKey", value)). Downstream tasks can then retrieve this (e.g., ctx.Get("previousTask.outputKey") or by having the values directly injected if the templating in task configs supports it).

This approach provides a structured and extensible way to define and run local workflows within the MFO Client.


🌀 Example 1: Simple Webhook Flow

This shows a basic flow from webhook reception to server forwarding using the structured MFO API client.

Diagram:

The sequence diagram illustrates the interaction: an external system posts a webhook, the MFO Client validates it (conceptually), then uses its API client (e.g., EventClient) to forward the processed event to the MFO Server.

sequenceDiagram
    participant User as External System
    participant ClientApp as MFO Client Application
    participant EventClient as MFO EventClient (part of ClientApp)
    participant MFOServer as MFO Server

    User->>+ClientApp: POST /webhook (Raw Event)
    ClientApp->>ClientApp: Verify Signature & Parse Event
    ClientApp->>+EventClient: Prepare MFO Event (e.g., models.Event)
    EventClient->>+MFOServer: POST /api/events (Authenticated Request with MFO Event)
    MFOServer-->>-EventClient: API Response (e.g., 200 OK, EventID)
    EventClient-->>-ClientApp: Forwarding Result
    ClientApp-->>-User: 202 Accepted (or error if initial validation failed)

Go Code Overview:

This simplified handler demonstrates receiving a webhook and initiating the forwarding process. It assumes eventClient is an initialized api.EventClientInterface available to the handler (e.g., via dependency injection or closure, as shown in the Quick Start).

// Simplified Webhook Handler (part of your main application or handlers package)

// import (
//  "context"
//  "encoding/json"
//  "log"
//  "net/http"
//  "time"
//  "your_project_path/internal/api"      // For EventClientInterface
//  "your_project_path/internal/models"   // For your Event struct
// )

func SimpleWebhookHandler(eventClient api.EventClientInterface) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        // 1. Basic Validation & Parsing (Signature verification should also be done here)
        var receivedEvent models.Event // Use your specific event structure from models
        if err := json.NewDecoder(r.Body).Decode(&receivedEvent); err != nil {
            http.Error(w, "Invalid event payload", http.StatusBadRequest)
            log.Printf("Error decoding webhook event: %v", err)
            return
        }
        defer r.Body.Close()

        log.Printf("Received webhook for event type: %s", receivedEvent.Type)

        // 2. Asynchronously forward the event using the structured API client
        go func(eventToForward models.Event) {
            ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) // Example timeout
            defer cancel()

            // The eventClient.SendEvent method handles the actual HTTP POST to the MFO server,
            // including authentication (via the underlying BaseClient) and error handling.
            // The `eventToForward` should match the expected input for `SendEvent`.
            response, err := eventClient.SendEvent(ctx, eventToForward)
            if err != nil {
                log.Printf("Error forwarding event via EventClient: %v", err)
                // Add more robust error handling: retry, dead-letter queue, etc.
            } else {
                log.Printf("Event forwarded successfully by EventClient. MFO Response ID (if any): %s", response.ID) // Assuming response has an ID
            }
        }(receivedEvent)

        // 3. Acknowledge receipt quickly
        w.WriteHeader(http.StatusAccepted)
        fmt.Fprintln(w, "Event received and is being processed.")
    }
}

// In main.go, you would set this up like:
// initialize eventClient as shown in Quick Start or Configuration sections
// http.HandleFunc("/simple-webhook", SimpleWebhookHandler(eventClient))

This example focuses on the client's role in receiving an event and handing it off to a specialized MFO API client (EventClient) for secure and reliable forwarding. The EventClient itself (and the BaseClient it uses) would incorporate the authentication, error handling, and retry logic discussed in Core Concepts.


🧩 Example 2: Complex Workflow (with Subflows)

Scenario: A complex event, once received by the MFO Client, needs to trigger a locally orchestrated workflow involving multiple tasks with dependencies. Some of these tasks might need to interact with the MFO server (e.g., to fetch additional data, update a resource, or send a chat message) using the configured MFO API client.

Taskflow YAML (data_pipeline_flow.yaml):

This YAML defines a multi-step data ingestion pipeline.

name: "DataIngestionPipeline"
description: "A workflow to fetch, transform, store data, and notify."

tasks:
  - id: "fetch_external_data"
    name: "Fetch Data from External API"
    type: "http_get_task" # Custom task type for making HTTP GET requests
    config:
      url: "https://api.external.com/data/{{.input.dataID}}" # URL can be templated from flow context
      timeout: "10s"
    successors:
      - "transform_fetched_data"

  - id: "transform_fetched_data"
    name: "Transform Raw Data"
    type: "data_transform_task" # Custom task type for data manipulation
    dependencies: ["fetch_external_data"]
    config:
      transformation_rule: "ruleset_v1"
      # Input data from fetch_external_data.result (or similar) will be implicitly available
      # or explicitly mapped by the task implementation via FlowContext.
    successors:
      - "store_data_mfo"

  - id: "store_data_mfo"
    name: "Store Processed Data in MFO Resource"
    type: "mfo_resource_create_task" # Task type that uses the MFO API Client
    dependencies: ["transform_fetched_data"]
    config:
      resource_type: "processed_data"
      # Data to store comes from transform_fetched_data.output
      # Task implementation will use FlowContext to get this data and the MFO client.
    successors:
      - "notify_completion_mfo"

  - id: "notify_completion_mfo"
    name: "Notify Team via MFO Chat"
    type: "mfo_chat_send_task" # Task type that uses the MFO API Client
    dependencies: ["store_data_mfo"]
    config:
      thread_id: "team_notifications_thread"
      message_template: "Data ingestion pipeline for {{.input.dataID}} completed. New resource ID: {{.store_data_mfo.resourceID}}"
      # Task implementation will use FlowContext and MFO client.

Diagram:

Illustrates the flow of tasks in the pipeline.

graph TD
    A[Webhook Trigger: New Data Available] --> B(Run DataIngestionPipeline Flow);
    subgraph DataIngestionPipeline Flow
        direction LR
        T1[fetch_external_data] --> T2[transform_fetched_data];
        T2 --> T3[store_data_mfo (Uses MFO Client)];
        T3 --> T4[notify_completion_mfo (Uses MFO Client)];
    end

Go Code Overview and API Client Access in Tasks:

The execution of this taskflow would be managed by the FlowExecutor as shown in the "Running a Taskflow" example. The key aspect here is how tasks like mfo_resource_create_task or mfo_chat_send_task access the MFO API client.

  1. Client Injection by FlowExecutor: When the FlowExecutor is initialized, it should be provided with a fully configured MFO API client (e.g., an instance of BaseClientInterface or a more specific composite client struct containing all specialized clients like ChatClient, ResourceClient, etc.).

    // Conceptual: Initializing FlowExecutor with an MFO API Client
    // mfoBaseClient := client.NewBaseClient(cfg.Client.BaseURL, httpClient, authProvider)
    // mfoChatClient := api.NewChatClient(mfoBaseClient)
    // mfoResourceClient := api.NewResourceClient(mfoBaseClient)
    // ... other clients ...
    
    // taskflowRegistry := taskflow.NewTaskRegistry()
    // ... register task factories ...
    
    // The FlowExecutor can store the MFO client(s) and add them to the FlowContext
    // for each flow execution, or the factories themselves can close over the client.
    // flowExecutor := taskflow.NewFlowExecutor(taskflowRegistry, concurrency, mfoBaseClient /* or a struct of clients */)
    
  2. Accessing Client in Task Factories: The task factory for an MFO-interacting task (e.g., MfoResourceCreateTaskFactory) would then retrieve the API client from the gtf.FlowContext during task execution.

    // Conceptual: Factory for a task that creates an MFO resource
    func MfoResourceCreateTaskFactory(config map[string]interface{}) (models.TaskFunc, error) {
        resourceType := config["resource_type"].(string)
    
        return func(ctx *gtf.FlowContext) error {
            // Retrieve the MFO ResourceClient from the flow context
            // The key "mfoResourceClient" must match how FlowExecutor provides it.
            resourceClientVal, ok := ctx.Get("mfoResourceClient")
            if !ok {
                log.Printf("Task '%s': MFO ResourceClient not found in context", ctx.Task().ID())
                return fmt.Errorf("mfoResourceClient not found in flow context")
            }
            mfoResourceClient := resourceClientVal.(api.ResourceClientInterface) // Type assertion
    
            // Get data to store from a previous task (example)
            transformedData, _ := ctx.Get("transform_fetched_data.output").(string) // Key format depends on actual output keys
    
            req := models.CreateResourceRequest{
                Type:    resourceType,
                Content: transformedData,
                // Metadata: ...
            }
    
            log.Printf("Task '%s': Creating MFO resource...", ctx.Task().ID())
            newResource, err := mfoResourceClient.CreateResource(context.Background(), req)
            if err != nil {
                log.Printf("Task '%s': Failed to create MFO resource: %v", ctx.Task().ID(), err)
                return err // Propagate error to the taskflow engine
            }
    
            log.Printf("Task '%s': MFO resource created with ID: %s", ctx.Task().ID(), newResource.ID)
            ctx.Set("resourceID", newResource.ID) // Make the new resource ID available to subsequent tasks
            return nil
        }, nil
    }
    
  3. Registration and Execution: This factory (MfoResourceCreateTaskFactory) would be registered with the TaskRegistry against the type mfo_resource_create_task. The overall flow execution (flowExecutor.ExecuteFlow(...)) remains the same as previously described.

// Conceptual main execution snippet (building on previous examples)

// func main() {
//     // ... (config loading, MFO client initialization, registry setup) ...

//     registry.RegisterTask("http_get_task", HttpGetTaskFactory) // Assuming HttpGetTaskFactory is defined
//     registry.RegisterTask("data_transform_task", DataTransformTaskFactory) // Assuming DataTransformTaskFactory is defined
//     registry.RegisterTask("mfo_resource_create_task", MfoResourceCreateTaskFactory)
//     registry.RegisterTask("mfo_chat_send_task", MfoChatSendTaskFactory) // Assuming MfoChatSendTaskFactory is defined

//     flowExecutor := taskflow.NewFlowExecutor(registry, 4, mfoClients) // mfoClients could be a struct with all needed clients

//     flowDef, err := taskflow.LoadFlowDefinitionFromFile("data_pipeline_flow.yaml")
//     if err != nil { log.Fatalf("Failed to load flow: %v", err) }

//     initialData := map[string]interface{}{"input": map[string]string{"dataID": "some-data-id-123"}}
//     err = flowExecutor.ExecuteFlow(context.Background(), flowDef, initialData, nil)
//     if err != nil { log.Printf("Workflow execution error: %v", err) }
// }

This example illustrates how the MFO Client's taskflow engine can orchestrate complex sequences of operations, including tasks that interact with the MFO server via its structured API client. The key is the proper setup of the TaskRegistry with task factories that can access the API client through the FlowContext provided by the FlowExecutor.


Advanced: Using Specialized API Clients in Taskflows

While the MFO Client's primary role might be event listening and forwarding, its local taskflow engine can execute more complex workflows. Tasks within these flows might need to interact with the MFO server using various specialized API clients beyond just forwarding events. These clients are typically initialized using a shared BaseClient instance, which would be configured with authentication and error handling policies.

Assume you have access to an initialized baseClient api.BaseClientInterface within your task execution context.

1. Using the ResourceClient

The ResourceClient allows tasks to manage resources (e.g., documents, data objects) on the MFO server.

Example: Creating a Resource

import (
    "context"
    "log"
    "time"

    "your_mfo_client_project/internal/api"      // Where NewResourceClient and ResourceClientInterface are
    "your_mfo_client_project/internal/models" // Where MFO server data models are defined
)

func CreateNewResource(ctx context.Context, baseClient api.BaseClientInterface, resourceType, content string) (*models.Resource, error) {
    resourceClient := api.NewResourceClient(baseClient)

    req := models.CreateResourceRequest{
        Type:    resourceType,
        Content: content,
        Metadata: map[string]interface{}{
            "created_by": "mfo-client-taskflow",
            "timestamp":  time.Now().UTC().Format(time.RFC3339),
        },
    }

    log.Printf("Attempting to create resource of type '%s'...", resourceType)
    resource, err := resourceClient.CreateResource(ctx, req)
    if err != nil {
        log.Printf("Failed to create resource: %v", err)
        // Handle error appropriately (e.g., check for *models.APIError)
        return nil, err
    }

    log.Printf("Successfully created resource with ID: %s", resource.ID)
    return resource, nil
}

/*
// How you might call this within a task (conceptual):
func MyTask(flowCtx *taskflow.FlowContext) {
    baseClient := flowCtx.Get("mfoBaseClient").(api.BaseClientInterface) // Assuming BaseClient is in context

    ctx, cancel := context.WithTimeout(flowCtx.Context(), 30*time.Second)
    defer cancel()

    _, err := CreateNewResource(ctx, baseClient, "document", "This is some important content.")
    if err != nil {
        // Task-level error handling
    }
}
*/
This example demonstrates initializing a ResourceClient and using it to create a new resource on the MFO server. The request includes the resource type, content, and some metadata.

2. Using the ChatClient

The ChatClient can be used to send messages to threads or interact with chat functionalities on the MFO server.

Example: Sending a Chat Message

import (
    "context"
    "log"
    "time"

    "your_mfo_client_project/internal/api"
    "your_mfo_client_project/internal/models"
)

func SendChatMessage(ctx context.Context, baseClient api.BaseClientInterface, threadID, messageContent string) (*models.ChatResponse, error) {
    chatClient := api.NewChatClient(baseClient)

    req := models.ChatRequest{
        ThreadID: threadID,
        Message:  messageContent,
        Metadata: map[string]interface{}{
            "source":    "mfo-client-taskflow",
            "sent_at":   time.Now().UTC().Format(time.RFC3339),
        },
    }

    log.Printf("Attempting to send message to thread '%s'...", threadID)
    chatResponse, err := chatClient.SendMessage(ctx, req)
    if err != nil {
        log.Printf("Failed to send chat message: %v", err)
        return nil, err
    }

    log.Printf("Successfully sent message. Message ID: %s", chatResponse.MessageID)
    return chatResponse, nil
}

/*
// How you might call this within a task (conceptual):
func MyNotifyTask(flowCtx *taskflow.FlowContext) {
    baseClient := flowCtx.Get("mfoBaseClient").(api.BaseClientInterface)

    ctx, cancel := context.WithTimeout(flowCtx.Context(), 15*time.Second)
    defer cancel()

    _, err := SendChatMessage(ctx, baseClient, "support-thread-123", "Task X has completed successfully.")
    if err != nil {
        // Task-level error handling
    }
}
*/
This snippet shows how to send a message to a specific thread ID using the ChatClient.

3. Using the JobClient

The JobClient allows interaction with the job system on the MFO server, for example, to create new jobs or check the status of existing ones.

Example: Creating a Job

import (
    "context"
    "log"
    "time"

    "your_mfo_client_project/internal/api"
    "your_mfo_client_project/internal/models"
)

func CreateNewJob(ctx context.Context, baseClient api.BaseClientInterface, jobType string, inputData interface{}) (*models.Job, error) {
    jobClient := api.NewJobClient(baseClient)

    req := models.CreateJobRequest{
        Type:  jobType,
        Input: inputData,
        Metadata: map[string]interface{}{
            "initiated_by": "mfo-client-taskflow",
            "request_time": time.Now().UTC().Format(time.RFC3339),
        },
    }

    log.Printf("Attempting to create job of type '%s'...", jobType)
    job, err := jobClient.CreateJob(ctx, req)
    if err != nil {
        log.Printf("Failed to create job: %v", err)
        return nil, err
    }

    log.Printf("Successfully created job with ID: %s, Status: %s", job.ID, job.Status)
    return job, nil
}

/*
// How you might call this within a task (conceptual):
func MyProcessingTask(flowCtx *taskflow.FlowContext) {
    baseClient := flowCtx.Get("mfoBaseClient").(api.BaseClientInterface)

    ctx, cancel := context.WithTimeout(flowCtx.Context(), 60*time.Second) // Longer timeout for job creation
    defer cancel()

    jobInput := map[string]interface{}{"data_url": "s3://bucket/path/to/data.csv", "param": "value"};
    _, err := CreateNewJob(ctx, baseClient, "data-processing-job", jobInput)
    if err != nil {
        // Task-level error handling
    }
}
*/
This example illustrates creating a new job on the MFO server by specifying a job type and input data.

These examples provide a starting point for integrating various MFO server interactions directly into your mfo-client's taskflows. Remember to handle errors robustly and manage context (e.g., timeouts, cancellation) appropriately within your tasks.


🎥 Logs & Monitoring

Metaphor: The CCTV System – Every action is monitored and logged for transparency and debugging.

Go Code: Logging Events

log.Printf("Received event: %+v", event)

err := forwardEvent(event)
if err != nil {
    log.Printf("Forwarding failed: %v", err)
} else {
    log.Println("Event successfully forwarded to MFO server")
}

Typical Logs:

2025/05/09 12:00:00 Received event: {Type: user_signup Data: {...}}
2025/05/09 12:00:00 Event successfully forwarded to MFO server

Use tools like Prometheus or Grafana for deeper monitoring of request counts, failures, and latencies.


By following these examples, you'll have a fully functional Client setup, from simple webhook forwarding to complex, orchestrated workflows.