Core Concepts¶
Understanding the following core components is crucial for effectively using mfo-client
.
API Client (internal/client
, internal/api
)¶
The API client is the central component for interacting with the MFO server's REST API. It is designed with a modular architecture based on interfaces to facilitate maintenance and testability. This client handles HTTP communication, request serialisation and response deserialisation, the addition of authentication headers, and error handling.
Architecture du Client¶
- Client de Base (
internal/client/client.go
):- Provides basic HTTP functionality (GET, POST, PUT, DELETE).
- Supports an
AuthProvider
to handle authentication transparently. - Is used by specialised clients to execute requests.
// Example structure for BaseClient
// internal/client/client.go
package client
import (
"context"
"net/http"
"your_project_path/internal/auth" // Adaptez le chemin
)
type BaseClient struct {
baseURL string
httpClient *http.Client
auth auth.AuthProvider // Interface for the authentication provider
}
func NewBaseClient(baseURL string, httpClient *http.Client, authProvider auth.AuthProvider) *BaseClient {
return &BaseClient{
baseURL: baseURL,
httpClient: httpClient,
auth: authProvider,
}
}
// Implementations of the Get, Post, Put, Delete methods...
// (See section 2.2.1 of the Main Doc for more details)
```
2. **Client Interfaces (`internal/api/interfaces.go`)**:
* Define contracts for the various features of the MFO API (Chat, Resources, Tasks, etc.).
* Allow dependency injection and facilitate testing.
```go
// Example of interfaces
// internal/api/interfaces.go
package api
import (
"context"
"your_project_path/internal/models" // Adaptez le chemin
)
type BaseClientInterface interface {
Get(ctx context.Context, path string, result interface{}) error
Post(ctx context.Context, path string, body interface{}, result interface{}) error
Put(ctx context.Context, path string, body interface{}, result interface{}) error
Delete(ctx context.Context, path string, result interface{}) error
}
type ChatClientInterface interface {
SendMessage(ctx context.Context, req models.ChatRequest) (*models.ChatResponse, error)
GetThread(ctx context.Context, threadID string) (*models.Thread, error)
}
type ResourceClientInterface interface {
GetResource(ctx context.Context, resourceID string) (*models.Resource, error)
CreateResource(ctx context.Context, req models.CreateResourceRequest) (*models.Resource, error)
// ... autres méthodes
}
// ... other interfaces (MemoryClientInterface, ToolClientInterface, JobClientInterface, etc.)
// (See section 2.2.2 of the Main Doc for the complete list)
```
3. **Specialised Clients (`internal/api/*.go`)**:
* Implement specific interfaces (e.g., `ChatClient`, `ResourceClient`).
* Use `BaseClient` to make actual HTTP calls.
* Abstract business logic for each domain of the API.
```go
// Example ChatClient
// internal/api/chat.go
package api
import (
"context"
"your_project_path/internal/models" // Adaptez le chemin
)
type ChatClient struct {
base BaseClientInterface
}
func NewChatClient(base BaseClientInterface) *ChatClient {
return &ChatClient{base: base}
}
func (c *ChatClient) SendMessage(ctx context.Context, req models.ChatRequest) (*models.ChatResponse, error) {
var response models.ChatResponse
// Le chemin exact (ex: "/api/chat") dépend de la spécification SLOP du serveur MFO.
err := c.base.Post(ctx, "/api/chat", req, &response)
return &response, err
}
// ... implementation of GetThread, etc.
```
#### Example of Use
```go
package main
import (
"context"
"log"
"net/http"
"your_project_path/internal/auth" // Adaptez le chemin
"your_project_path/internal/client" // Adaptez le chemin
"your_project_path/internal/api" // Adaptez le chemin
"your_project_path/internal/models" // Adaptez le chemin
)
func main() {
// 1. Initialise the authentication provider (AuthProvider)
// (See section 2.3 of the Main Doc for the implementation of AuthProvider)
// For this example, let's assume that authProvider is a valid instance.
var authProvider auth.AuthProvider
// Example: authProvider := auth.NewAuth(âhttps://mfo.example.com/authâ, http.DefaultClient)
// err := authProvider.Login(âuser@example.comâ, âpasswordâ)
// if err != nil { log.Fatal(err) }
// 2. Initialise the BaseClient
baseClient := client.NewBaseClient("https://mfo.example.com", http.DefaultClient, authProvider)
// 3. Create specialised clients
chatClient := api.NewChatClient(baseClient)
resourceClient := api.NewResourceClient(baseClient) // En supposant que NewResourceClient existe
// 4. Use clients
chatReq := models.ChatRequest{
Message: "Bonjour MFO !",
ThreadID: "thread-abc-123",
}
chatResp, err := chatClient.SendMessage(context.Background(), chatReq)
if err != nil {
log.Printf("Erreur lors de l'envoi du message : %v", err)
} else {
log.Printf("Réponse du chat : ID=%s, Contenu=%s", chatResp.MessageID, chatResp.Content)
}
resource, err := resourceClient.GetResource(context.Background(), "resource-xyz-789")
if err != nil {
log.Printf("Erreur lors de la récupération de la ressource : %v", err)
} else {
log.Printf("Ressource récupérée : ID=%s, Type=%s", resource.ID, resource.Type)
}
}
API Client Error Handling¶
The BaseClient
is responsible for standard handling of HTTP responses and errors. API errors are typically encapsulated in an APIError
structure.
// internal/client/client.go (extract for error handling)
// ... (in the handleResponse method of BaseClient, for example)
// if resp.StatusCode >= 400 {
// var apiErr models.APIError // Assuming that APIError is defined in models
// if err := json.NewDecoder(resp.Body).Decode(&apiErr); err != nil {
// // Error decoding the error response itself
// return fmt.Errorf(âerror decoding the error response: %wâ, err)
// }
// // Enrich apiErr with the StatusCode if not already done
// // apiErr.StatusCode = resp.StatusCode (if necessary)
// return &apiErr // Return the structured API error
// }
// ...
APIError
and error handling). This structure allows for clear, testable, and maintainable interaction with the MFO server.
Authentication (internal/auth
)¶
Secure communication with the private /api/*
endpoints of the MFO server is handled by the authentication module. This module is crucial because it ensures that only authorised requests reach the server. It supports token-based authentication (e.g. JWT) and integrates with the BaseClient
to automatically add the necessary authentication headers.
Key Concepts of Authentication¶
-
AuthProvider
Interface: * Defines a contract for mechanisms to obtain authentication information (e.g., a token). * Allows different authentication strategies (OAuth, API Key, JWT). * TheBaseClient
uses an implementation of this interface to retrieve the authorisation header.// Exemple d'interface AuthProvider // internal/auth/auth.go (ou un fichier d'interfaces dĂ©diĂ©) package auth type AuthProvider interface { GetAuthHeader() (string, error) // Retourne la valeur de l'en-tĂȘte "Authorization" (ex: "Bearer <token>") Login(credentials interface{}) error // Optionnel, pour les flux de login explicites // ... autres mĂ©thodes pertinentes (ex: RefreshToken) }
-
Implémentation
Auth
(internal/auth/auth.go
):- A concrete implementation for managing the acquisition and secure storage of tokens.
- Typically includes a
Login
method for authenticating with the server (e.g., with email/password) and retrieving a token. - Manages the token in a thread-safe manner.
(See section 2.3 of the Main Doc for details on// Simplified example of an Auth structure // internal/auth/auth.go package auth import ( "bytes" "encoding/json" "fmt" "net/http" "sync" // "your_project_path/internal/models" // Pour LoginRequest, LoginResponse ) // Let's assume that LoginRequest and LoginResponse are defined in models type LoginRequest struct { Email string `json:"email"` Password string `json:"password"` } type LoginResponse struct { Token string `json:"token"` } type Auth struct { baseURL string // Base URL for authentication endpoints (ex: https://mfo.example.com/auth) httpClient *http.Client token string mu sync.RWMutex // For secure concurrent access to the token } func NewAuth(baseURL string, httpClient *http.Client) *Auth { return &Auth{ baseURL: baseURL, httpClient: httpClient, } } // Login authenticates and stores the token func (a *Auth) Login(email, password string) error { a.mu.Lock() defer a.mu.Unlock() loginReq := LoginRequest{Email: email, Password: password} reqBody, err := json.Marshal(loginReq) if err != nil { return fmt.Errorf("erreur lors du marshaling de la requĂȘte de login: %w", err) } // The exact path (e.g. â/loginâ) depends on the MFO server specifications. resp, err := a.httpClient.Post(a.baseURL+"/login", "application/json", bytes.NewBuffer(reqBody)) if err != nil { return fmt.Errorf("login request failed: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { // Handle login errors (e.g., read the body of the error response) return fmt.Errorf("login failed, status: %s", resp.Status) } var loginResp LoginResponse if err := json.NewDecoder(resp.Body).Decode(&loginResp); err != nil { return fmt.Errorf("error decoding login response: %w", err) } a.token = loginResp.Token return nil } // GetAuthHeader implements AuthProvider and returns the authorisation header func (a *Auth) GetAuthHeader() (string, error) { a.mu.RLock() defer a.mu.RUnlock() if a.token == "" { return "", fmt.Errorf("No tokens available, please log in first.") } return fmt.Sprintf("Bearer %s", a.token), nil } // SetToken allows you to manually set a token (useful if obtained by other means) func (a *Auth) SetToken(token string) { a.mu.Lock() defer a.mu.Unlock() a.token = token }
GetToken
,ClearToken
, and error handling).
Integration with BaseClient
¶
The BaseClient
receives an instance of AuthProvider
during its initialisation. Before each API request, the BaseClient
calls the GetAuthHeader()
method of its AuthProvider
to obtain the value of the Authorization
header and adds it to the outgoing request.
// Simplified extract of the logic of a Post method in BaseClient
// func (c *BaseClient) Post(ctx context.Context, path string, body interface{}, result interface{}) error {
// // ... prĂ©paration de la requĂȘte ...
//
// authHeader, err := c.auth.GetAuthHeader() // Appel Ă AuthProvider
// if err != nil {
// return fmt.Errorf("erreur d'authentification: %w", err)
// }
// if authHeader != "" {
// req.Header.Set("Authorization", authHeader)
// }
//
// // ... exĂ©cution de la requĂȘte ...
// }
This approach decouples the API logic from how authentication is handled, providing flexibility and security.
Taskflow Engine (internal/taskflow
, go-taskflow
)¶
The mfo-client
integrates a powerful workflow engine based on the go-taskflow
library (github.com/noneback/go-taskflow). This engine allows you to define, build, and execute complex task workflows (Taskflows) locally, which can orchestrate calls to the MFO server and other operations.
Key Components of the Taskflow Engine¶
- Workflow Definition (YAML):
- Workflows are described declaratively in YAML files.
- These definitions specify tasks, their types, configurations, dependencies, conditions, and the ability to compose sub-flows.
-
The YAML structure must follow the guidelines (e.g.,
yaml-taskflow_directive.mdc
). -
TaskRegistry
(internal/taskflow/registry.go
):- A central registry that maps task
type
strings (from the YAML) to Go function factories (models.TaskFactory
). - Each
TaskFactory
is responsible for creating an executable task instance (models.TaskFunc
).
- A central registry that maps task
-
Custom tasks, especially those interacting with the MFO server, are registered here. These tasks will often need to access a configured instance of the MFO API client (passed via the flow execution context).
// internal/taskflow/registry.go (Concept) package taskflow import ( // "your_project_path/internal/models" // "sync" ) // Let's assume that TaskFactory and TaskFunc are defined in models or a common package. // type TaskFunc func(ctx *context.FlowContext) // type TaskFactory func(config map[string]interface{}) (TaskFunc, error) type TaskRegistry struct { // mu sync.RWMutex // taskFactories map[string]TaskFactory // conditionFactories map[string]models.ConditionFactory // Si conditions supportées } func NewTaskRegistry() *TaskRegistry { // return &TaskRegistry{ // taskFactories: make(map[string]TaskFactory), // conditionFactories: make(map[string]models.ConditionFactory), // } return nil // Placeholder implementation } func (r *TaskRegistry) RegisterTask(taskType string, factory interface{} /* TaskFactory */) { // r.mu.Lock() // defer r.mu.Unlock() // r.taskFactories[taskType] = factory } func (r *TaskRegistry) GetTaskFactory(taskType string) (interface{} /* TaskFactory */, bool) { // r.mu.RLock() // defer r.mu.RUnlock() // factory, ok := r.taskFactories[taskType] // return factory, ok return nil, false // Placeholder implementation }
-
FlowLoader
(internal/taskflow/loader.go
, if existing or to be created):- Responsible for loading and parsing workflow definitions from YAML files into a Go structure (
models.FlowDefinition
). - (See section 2.4.3 of the Main Doc)
- Responsible for loading and parsing workflow definitions from YAML files into a Go structure (
-
Builder
(internal/taskflow/builder.go
)**:- Takes a flow definition (
models.FlowDefinition
parsed from YAML) and theTaskRegistry
. - Builds an executable flow graph (
gtf.Flow
from thego-taskflow
library). - Resolves dependencies between tasks and connects them.
// internal/taskflow/builder.go (Concept) package taskflow import ( // "fmt" // gtf "github.com/noneback/go-taskflow" // "your_project_path/internal/models" ) type Builder struct { // registry *TaskRegistry } func NewBuilder(registry *TaskRegistry) *Builder { // return &Builder{registry: registry} return nil // Implémentation placeholder } func (b *Builder) BuildFlow(def interface{} /* *models.FlowDefinition */) (interface{} /* *gtf.Flow */, error) { // flow := gtf.NewFlow() // // Logic for iterating over def.Tasks, using b.registry.GetTaskFactory, creating gtf.Task... // // Add tasks and dependencies to the flow. // return flow, nil return nil, fmt.Errorf("BuildFlow non implémenté") // Implémentation placeholder }
- Takes a flow definition (
-
FlowExecutor
(internal/taskflow/executor.go
ou directementgtf.Executor
):- Executes the constructed
gtf.Flow
graph. - Handles task scheduling, dependency enforcement, concurrency (configurable), and execution of task Go functions.
- Allows passing an initial context and execution options.
// internal/taskflow/executor.go (Concept, pourrait wrapper gtf.Executor) package taskflow import ( // "fmt" // gtf "github.com/noneback/go-taskflow" // "your_project_path/internal/models" // "context" // For the FlowContext of go-taskflow ) type FlowExecutor struct { // registry *TaskRegistry // builder *Builder // loader *FlowLoader // If the loading logic is here // executor gtf.Executor // concurrency int } func NewFlowExecutor(registry *TaskRegistry, concurrency int) *FlowExecutor { // return &FlowExecutor{ // registry: registry, // builder: NewBuilder(registry), // loader: NewFlowLoader(), // si applicable // executor: gtf.NewExecutor(uint(concurrency)), // concurrency: concurrency, // } return nil // Placeholder implementation } // ExecuteFlow builds and executes a flow. // flowDef: the definition parsed from YAML. // initialContext: a map[string]interface{} for initial data. // options: *gtf.ExecutionOptions to configure the execution. func (e *FlowExecutor) ExecuteFlow(flowDef interface{} /* *models.FlowDefinition */, initialContext map[string]interface{}, options interface{} /* *gtf.ExecutionOptions */) error { // flow, err := e.builder.BuildFlow(flowDef) // if err != nil { // return fmt.Errorf("Ă©chec de la construction du flux: %w", err) // } // // // Le contexte go-taskflow peut ĂȘtre enrichi ici, par exemple avec le client API MFO // execCtx := gtf.NewContext() // if initialContext != nil { // for k, v := range initialContext { // execCtx.Set(k, v) // } // } // // execCtx.Set("mfoApiClient", e.mfoApiClientInstance) // Exemple d'injection du client API // // result := e.executor.Run(flow, gtf.WithContext(execCtx), gtf.WithOptions(options)) // return result.Error() return fmt.Errorf("ExecuteFlow non implĂ©mentĂ©") // ImplĂ©mentation placeholder }
- Executes the constructed
-
Execution Context and Data Passing (
gtf.FlowContext
):- A context (
gtf.FlowContext
) is maintained and passed to each task during execution. - This context can contain:
- An instance of the MFO API client (injected by the
FlowExecutor
), allowing tasks to communicate with the server. - A map for storing task outputs. Tasks can thus access the results of previous tasks (for example, using a syntax such as
${previous_task_id.output_key}
in their YAML configuration, which the Go task implementation will resolve by querying the context). - See section 2.4.3 of the Main Doc for
gtf.WithContext
).
- A context (
Example of Taskflow Engine Usage¶
package main
import (
"log"
// "your_project_path/internal/taskflow"
// "your_project_path/internal/models" // For FlowDefinition, TaskFunc, etc.
// "your_project_path/internal/api" // For the client API MFO
// Adapt the imports according to your project structure
)
// Example of a task function that uses an MFO API client (injected via the context)
// func MyApiTaskFactory(config map[string]interface{}) (models.TaskFunc, error) {
// targetResource := config["resource_id"].(string) // Exemple de paramĂštre de config
//
// return func(ctx *models.FlowContext) { // models.FlowContext should be compatible with gtf.FlowContext
// log.Printf(âExecuting MyApiTask for resource: %sâ, targetResource)
//
// // Retrieve the MFO API client from the context
// apiClientVal, ok := ctx.Get(âmfoApiClientâ)
// if !ok {
// log.Println(âError: MFO API client not found in contextâ)
// ctx.SetError(fmt.Errorf(âmfoApiClient not foundâ))
// return
// }
// mfoApiClient := apiClientVal.(api.ResourceClientInterface) // Adapt to the actual client type
//
// // Use the API client
// resource, err := mfoApiClient.GetResource(context.Background(), targetResource)
// if err != nil {
// log.Printf(âError during API call in MyApiTask: %vâ, err)
// ctx.SetError(err) // Report the error to the flow engine
// return
// }
// log.Printf(âMyApiTask retrieved the resource: %sâ, resource.Content)
// ctx.Set(âmyOutputKeyâ, resource.Content) // Store an output in the context
// }, nil
// }
func main() {
// 1. Initialise the TaskRegistry and register the task types
registry := taskflow.NewTaskRegistry()
// registry.RegisterTask("my_api_task_type", MyApiTaskFactory)
// registry.RegisterTask("mfo_api_chat_send", MfoChatSendFactory) // Exemple de tĂąche MFO
// 2. Initialise the FlowExecutor (with the registry and the MFO API client if necessary for injection)
// To inject the MFO API client, NewFlowExecutor could take it as an argument
// and store it to add it to the flow execution context.
concurrency := 4 // Number of concurrent tasks
executor := taskflow.NewFlowExecutor(registry, concurrency /*, mfoApiClientInstance */)
// 3. Load the flow definition from YAML (assuming that flowDef is of type *models.FlowDefinition)
// flowDef, err := taskflow.LoadFromYAML("path/to/your_flow.yaml") // Via un FlowLoader
// if err != nil {
// log.Fatalf("Erreur lors du chargement de la définition du flux: %v", err)
// }
var flowDef interface{} // Placeholder for *models.FlowDefinition
// 4. Define an initial context if necessary
initialContext := map[string]interface{}{
"processID": "process-xyz-123",
"inputData": "donnée initiale importante",
}
// 5. Run the flow
log.Println("Début de l'exécution du flux...")
err := executor.ExecuteFlow(flowDef, initialContext, nil /* options */)
if err != nil {
log.Fatalf("Error while executing the flow: %v", err)
}
log.Println("Flow execution completed successfully.")
}
This architecture provides a solid foundation for automating complex processes involving interactions with the MFO server and other systems.
Data Models (internal/models
)¶
The internal/models
package (or an equivalent path in your project structure, e.g., a shared Go module if mfo-client
and mfo-orchestrator
are separate projects but share models) defines the essential Go data structures used throughout the MFO client library and for interactions with the MFO server.
- Alignment with the MFO Server: These models represent the entities and operations supported by the MFO server, as defined in section 2.5 (âData Modelsâ) of the Main Document (
MFO Client Orchestrator Documentation-13.05.2025.md
). They include structures for Chat, Resources, Memory, Tools, Tasks (Jobs), Events, Notifications, etc. API requests and responses. - Internal Use: They are also used by the Taskflow engine for definitions such as
FlowDefinition
,TaskDefinition
, and to type the data exchanged between tasks. - Consistency: It is crucial to keep these data models synchronised with the definitions expected by the MFO server to avoid serialisation/deserialisation errors and incompatibilities.
For example, models such as models.ChatRequest
, models.Resource
, models.Job
, and models.APIError
are defined in this package and used by specialised API clients and the Taskflow engine. Always refer to the central model documentation (section 2.5 of the Main Doc) for the exact definitions.
Prompt Management (internal/prompt
, internal/prompts
)¶
For agentic workflows involving LLMs via the SLOP server's /api/chat
endpoint, mfo-client
includes utilities for managing prompts used in ChatRequest
bodies:
- Parsing & Storage: Loading and managing prompt templates, potentially from Markdown files.
- Templating: Injecting variables (including data passed via
${task_id.output_key}
) into prompt templates before sending them in an API call.
đïž Event Reception¶
Definition:
The Client listens for incoming HTTP requests (webhooks) or other types of events such as cron jobs or AI triggers.
Metaphor:
The Doorbell â The Client sits by the door and responds whenever someone rings (an event is sent).
Go Code Example â Webhook Listener:
// internal/handlers/webhook.go
http.HandleFunc("/webhook", func(w http.ResponseWriter, r *http.Request) {
var event Event
if err := json.NewDecoder(r.Body).Decode(&event); err != nil {
http.Error(w, "Invalid request", http.StatusBadRequest)
return
}
// Forward event to the MFO server asynchronously
go forwardEvent(event)
w.WriteHeader(http.StatusAccepted)
})
đ Forwarding Events¶
Definition: Once an event is received, the Client packages it and sends it to the MFO server for processing using the structured API client.
Metaphor: The Pneumatic Tube System with Security Check â The Client securely wraps the event, authenticates, and then shoots it directly to the processing hub via the designated API client.
Go Code Example â Event Forwarding using API Client:
// internal/api/event_forwarder.go (ou un nom de fichier pertinent)
package api // ou un package plus spécifique comme 'forwarder'
import (
"context"
"log"
// Adapt the imports according to your project structure:
// "your_project_path/internal/models"
// "your_project_path/internal/api/interfaces" // si EventClientInterface est lĂ
)
// Suppose âEventâ is a type defined in your models (e.g., models.Event)
// and you have an EventClientInterface interface and an EventClient implementation.
/*
// Example interface and client for events (conceptual)
// in internal/api/interfaces.go:
type EventClientInterface interface {
SendEvent(ctx context.Context, event models.Event) (*models.EventResponse, error) // Ajustez les modÚles de réponse
}
// dans internal/api/event_client.go:
type EventClient struct {
base BaseClientInterface // Le BaseClientInterface défini précédemment
}
func NewEventClient(base BaseClientInterface) *EventClient {
return &EventClient{base: base}
}
func (c *EventClient) SendEvent(ctx context.Context, event models.Event) (*models.EventResponse, error) {
var response models.EventResponse
// Le chemin exact (ex: "/api/events") dépend de la spécification SLOP du serveur MFO.
err := c.base.Post(ctx, "/api/events", event, &response)
return &response, err
}
*/
// forwardEvent now uses an EventClientInterface.
// The eventClient instance must be initialised and available.
func forwardEvent(ctx context.Context, eventClient /* EventClientInterface */ interface{}, eventData /* models.Event */ interface{}) error {
log.Printf("Forwarding event: %+v", eventData)
// Supposons que eventClient est une instance valide de EventClientInterface
// ec := eventClient.(EventClientInterface) // Type assertion
// _, err := ec.SendEvent(ctx, eventData.(models.Event))
// --- Début du code placeholder pour l'appel ---
// Remplacer par l'appel réel via l'interface eventClient une fois l'implémentation disponible.
// Exemple conceptuel direct avec BaseClient si EventClient n'est pas encore fait:
// err := baseClient.Post(ctx, "/api/events", eventData, nil)
// Pour l'instant, simulons l'appel :
var err error = nil // simule pas d'erreur
log.Printf("Simulating event forwarding via API client for event: %+v", eventData)
// --- Fin du code placeholder ---
if err != nil {
log.Printf("Forwarding event failed: %v", err)
return err
}
log.Printf("Event forwarded successfully to MFO server.") // Statut de réponse serait dans l'objet retourné par SendEvent
return nil
}
/*
// Exemple d'appel Ă forwardEvent dans votre handler de webhook:
// webhook.go
// ...
// var myEventClient api.EventClientInterface // Initialisé quelque part globalement ou via injection
// ...
// func WebhookHandler(w http.ResponseWriter, r *http.Request) {
// var event models.Event
// // ... décoder l'événement ...
//
// go func() {
// err := forwardEvent(context.Background(), myEventClient, event)
// if err != nil {
// log.Printf("Asynchronous event forwarding failed: %v", err)
// }
// }()
// w.WriteHeader(http.StatusAccepted)
// }
// ...
*/
---
## đ Configuration
**Definition:**
Le comportement du client MFO, y compris la maniÚre dont il se connecte au serveur MFO, gÚre les tentatives et les erreurs, est défini via des fichiers de configuration YAML.
**Metaphor:**
**Le Tableau de Bord de ContrĂŽle** â Permet de rĂ©gler avec prĂ©cision les paramĂštres de communication et de rĂ©silience du client.
**Structure de Configuration YAML (Client):**
La configuration du client API MFO peut inclure les champs suivants, en s'inspirant de la section 4.3.1 du Document Principal :
```yaml
# Exemple de configuration client (client_config.yaml)
client:
base_url: "https://api.example.com" # URL de base du serveur MFO (pour les appels API)
auth_base_url: "https://auth.example.com" # URL de base pour l'authentification (si différente)
timeout: "30s" # Timeout global pour les requĂȘtes HTTP
# Configuration pour les tentatives (Retry) - Voir aussi Section 2.7 du Doc Principal
retry:
max_attempts: 3
initial_backoff: "1s" # Délai initial avant la premiÚre nouvelle tentative
max_backoff: "1m" # Délai maximal entre les tentatives
backoff_multiplier: 2.0 # Facteur multiplicatif pour le backoff exponentiel
# Configuration pour le Circuit Breaker - Voir aussi Section 2.7 du Doc Principal
circuit_breaker:
failure_threshold: 5 # Nombre d'échecs avant d'ouvrir le circuit
reset_timeout: "1m" # Durée pendant laquelle le circuit reste ouvert avant de passer à "half-open"
# Configurations spécifiques au listener de webhook (si le client en a un)
webhook_listener:
path: "/webhook" # Chemin pour écouter les webhooks entrants
port: ":8080" # Port d'écoute pour le serveur HTTP du webhook
# D'autres configurations spécifiques à l'application peuvent s'ajouter ici
# logging:
# level: "info"
# format: "json"
Tableau des Configurations Clés (Explication) :
Clé (sous client ) | Description | Exemple de Valeur |
---|---|---|
base_url | L'URL de base du serveur MFO Ă laquelle les requĂȘtes API sont envoyĂ©es. | https://api.mfo.ai |
auth_base_url | (Optionnel) L'URL de base pour le service d'authentification, si elle est distincte de base_url . | https://auth.mfo.ai |
timeout | La durĂ©e maximale d'attente pour une rĂ©ponse Ă une requĂȘte HTTP avant de considĂ©rer qu'elle a Ă©chouĂ©. | 30s , 1m |
retry.max_attempts | Le nombre maximum de fois qu'une requĂȘte Ă©chouĂ©e sera retentĂ©e. | 3 , 5 |
retry.initial_backoff | Le délai d'attente initial avant la premiÚre nouvelle tentative aprÚs un échec. | 500ms , 1s |
retry.max_backoff | Le délai d'attente maximal entre les tentatives, pour éviter des attentes indéfiniment longues. | 30s , 1m |
retry.backoff_multiplier | Le facteur par lequel le délai de backoff est multiplié à chaque tentative (pour un backoff exponentiel). | 1.5 , 2.0 |
circuit_breaker.failure_threshold | Le nombre d'échecs consécutifs nécessaires pour que le circuit breaker s'ouvre. | 5 , 10 |
circuit_breaker.reset_timeout | La durée pendant laquelle le circuit breaker reste ouvert avant de tenter de se réinitialiser (état "half-open"). | 1m , 5m |
Note: Les configurations pour webhook_listener
(comme path
et port
) sont spĂ©cifiques Ă la fonctionnalitĂ© de rĂ©ception de webhooks du client et peuvent ĂȘtre structurĂ©es sĂ©parĂ©ment de la configuration du client
API lui-mĂȘme, comme montrĂ© dans l'exemple YAML. La configuration de l'authentification elle-mĂȘme (par exemple, identifiants pour obtenir un token) serait gĂ©rĂ©e par le module auth
et pourrait provenir de variables d'environnement ou d'un fichier de configuration sécurisé, plutÎt que directement dans ce YAML pour des raisons de sécurité.
đïž Error Handling and Retry Logic¶
Une gestion robuste des erreurs et des mécanismes de nouvelle tentative sont essentiels pour la fiabilité des interactions avec le serveur MFO. Le client MFO devrait s'appuyer sur une stratégie claire pour traiter les erreurs réseau, les erreurs du serveur, et d'autres conditions exceptionnelles.
Metaphor: L'IngĂ©nieur de RĂ©silience â Le systĂšme est conçu non seulement pour fonctionner, mais aussi pour gĂ©rer Ă©lĂ©gamment les dĂ©faillances et rĂ©cupĂ©rer lorsque c'est possible.
Concepts Clés de la Gestion des Erreurs¶
-
Types d'Erreurs Personnalisés (Section 2.7.1 & 6.1.1 du Doc Principal):
- Définir des types d'erreurs spécifiques permet de distinguer la nature des problÚmes et d'y réagir de maniÚre appropriée.
APIError
: ReprĂ©sente une erreur retournĂ©e par l'API du serveur MFO (par exemple, statut HTTP >= 400). Elle peut contenir un code d'erreur, un message, et des dĂ©tails.// Exemple (doit ĂȘtre dĂ©fini dans internal/models ou un package d'erreurs) // type APIError struct { // StatusCode int `json:"status_code,omitempty"` // AjoutĂ© par le client ou prĂ©sent dans la rĂ©ponse // Code string `json:"code,omitempty"` // Code d'erreur spĂ©cifique de l'application MFO // Message string `json:"message"` // Details map[string]interface{} `json:"details,omitempty"` // } // func (e *APIError) Error() string { /* ... */ }
ValidationError
: Pour les erreurs de validation des données en entrée.RetryableError
: Une erreur qui encapsule une erreur originale et indique si une nouvelle tentative est justifiée, potentiellement avec un délai.CircuitBreakerError
: Indique que le circuit breaker est ouvert pour un service particulier.
-
Politique de Nouvelle Tentative (
RetryPolicy
) (Section 2.7.2 du Doc Principal):- DĂ©finit comment et quand les opĂ©rations Ă©chouĂ©es doivent ĂȘtre retentĂ©es.
- Comprend des paramĂštres comme :
MaxAttempts
: Nombre maximal de tentatives.InitialBackoff
: Délai avant la premiÚre nouvelle tentative.MaxBackoff
: Délai maximal entre les tentatives.BackoffMultiplier
: Pour un backoff exponentiel.- Une liste des types d'erreurs considérées comme "retryable".
// Concept (doit ĂȘtre dĂ©fini dans internal/retry ou un package similaire) // type RetryPolicy struct { // MaxAttempts int // InitialBackoff time.Duration // MaxBackoff time.Duration // BackoffMultiplier float64 // RetryableErrors []error // Liste d'erreurs types (ex: &RetryableError{}, &models.APIError{StatusCode: 503}) // } // func NewDefaultRetryPolicy() *RetryPolicy { /* ... */ } // func (p *RetryPolicy) ShouldRetry(err error) bool { /* ... */ } // func (p *RetryPolicy) GetBackoff(attempt int) time.Duration { /* ... */ }
-
Circuit Breaker (Disjoncteur) (Section 2.7.3 du Doc Principal):
- Un patron de conception pour prévenir des appels répétés à un service qui est probablement en panne.
- AprÚs un certain nombre d'échecs (
failureThreshold
), le circuit "s'ouvre", et les appels suivants échouent immédiatement pendant une période (resetTimeout
). - AprÚs le timeout, le circuit passe à "half-open", permettant un nombre limité d'appels de test. Si réussis, le circuit se ferme; sinon, il reste ouvert.
// Concept (doit ĂȘtre dĂ©fini dans internal/retry ou un package similaire) // type CircuitBreaker struct { // name string // failureThreshold int // resetTimeout time.Duration // // ... autres Ă©tats internes (failures, lastFailure, state, mu sync.RWMutex) // } // func NewCircuitBreaker(name string, failureThreshold int, resetTimeout time.Duration) *CircuitBreaker { /* ... */ } // func (cb *CircuitBreaker) Execute(fn func() error) error { /* ... */ } // func (cb *CircuitBreaker) allowRequest() bool { /* ... */ } // Logique interne
-
Exécuteur de Tentatives (
RetryExecutor
) (Section 2.7.4 du Doc Principal):- Combine une
RetryPolicy
et unCircuitBreaker
pour exécuter une fonction donnée avec la logique de résilience configurée. - Le
BaseClient
du client API MFO pourrait ĂȘtre configurĂ© pour utiliser un tel exĂ©cuteur pour ses opĂ©rations HTTP.
// Concept (doit ĂȘtre dĂ©fini dans internal/retry ou un package similaire) // type RetryExecutor struct { // policy *RetryPolicy // cb *CircuitBreaker // Optionnel, peut ĂȘtre nil si non utilisĂ© // } // func NewRetryExecutor(policy *RetryPolicy, cb *CircuitBreaker) *RetryExecutor { /* ... */ } // func (e *RetryExecutor) Execute(fn func() error) error { /* ... */ }
- Combine une
Intégration dans le Client MFO¶
- Configuration YAML: Les paramĂštres pour
RetryPolicy
etCircuitBreaker
peuvent ĂȘtre chargĂ©s depuis la configuration YAML du client (voir la section "Configuration" mise Ă jour prĂ©cĂ©demment, basĂ©e sur 4.3.1 du Doc Principal). - Client API: Le
BaseClient
(ou les méthodes des clients spécialisés) devrait utiliser unRetryExecutor
pour envelopper ses appels HTTP. L'erreur retournée parExecute
serait alors l'erreur finale aprĂšs toutes les tentatives, ou uneCircuitBreakerError
. - Gestion des Erreurs Spécifiques: AprÚs un appel API, le code client peut inspecter le type d'erreur retournée (par exemple, en utilisant
errors.As
ouerrors.Is
) pour prendre des décisions spécifiques :- Si c'est une
APIError
avec un statut 4xx (sauf 429 Too Many Requests, qui pourrait ĂȘtre retryable), une nouvelle tentative n'est gĂ©nĂ©ralement pas utile. - Si c'est une
APIError
avec un statut 5xx ou une erreur réseau, une nouvelle tentative est souvent appropriée. - Si c'est une
CircuitBreakerError
, cela indique que le service est considéré comme indisponible par le circuit breaker.
- Si c'est une
Go Code Example â Gestion d'Erreur aprĂšs un Appel API¶
// Supposons que 'chatClient' est une instance initialisée de api.ChatClientInterface
// et que les mécanismes de retry/circuit breaker sont intégrés dans son implémentation.
// import (
// "errors"
// "log"
// "your_project_path/internal/models" // Pour APIError, etc.
// "your_project_path/internal/retry" // Pour CircuitBreakerError, RetryableError (si exposés)
// )
// func makeChatRequest() {
// chatReq := models.ChatRequest{Message: "Test", ThreadID: "123"}
// chatResp, err := chatClient.SendMessage(context.Background(), chatReq)
//
// if err != nil {
// var apiErr *models.APIError
// var cbErr *retry.CircuitBreakerError // Supposant que ce type est exporté ou identifiable
//
// if errors.As(err, &apiErr) {
// log.Printf("Erreur API reçue: Status %d, Code '%s', Message '%s'", apiErr.StatusCode, apiErr.Code, apiErr.Message)
// // Logique spécifique basée sur apiErr.StatusCode ou apiErr.Code
// if apiErr.StatusCode == 401 {
// log.Println("Erreur d'authentification. Le token a peut-ĂȘtre expirĂ©.")
// // Tenter de rafraĂźchir le token ou demander une nouvelle connexion.
// }
// } else if errors.As(err, &cbErr) {
// log.Printf("Circuit Breaker est ouvert pour le service: %s. Ătat: %s", cbErr.ServiceName, cbErr.State)
// // Logique pour gérer un service temporairement indisponible (ex: notifier, attendre plus longtemps)
// } else {
// // Erreur générique (potentiellement une erreur réseau aprÚs épuisement des tentatives)
// log.Printf("Ăchec de l'envoi du message aprĂšs tentatives: %v", err)
// }
// // Ici, on pourrait escalader l'erreur, la mettre dans une file d'attente pour traitement ultérieur, etc.
// return
// }
//
// log.Printf("Message envoyé avec succÚs: %s", chatResp.MessageID)
// }
L'implémentation d'une gestion d'erreur aussi complÚte assure que le client MFO est résilient face aux problÚmes transitoires et fournit des informations claires lorsque des erreurs persistantes se produisent. Il est crucial que les configurations de RetryPolicy
et CircuitBreaker
(visibles dans la section "Configuration") soient ajustées en fonction des caractéristiques attendues du serveur MFO et du réseau.
đą Multi-Instance Strategy¶
Definition: You can deploy multiple Client instances, each independently configured.
Metaphor: Multiple Reception Desks â Each reception handles events from different entrances or regions.
Why?
- Isolate workloads (CRM events vs. IoT events)
- Scale horizontally to manage traffic bursts
- Deploy closer to data sources (geo-based clients)
Mermaid Diagram:
graph TD
A[Webhook Provider 1] --> B(Client A)
A2[Webhook Provider 2] --> B2(Client B)
B --> S(MFO Server)
B2 --> S
đ Taskflows & Webhooks¶
Definition:
- Taskflows: The workflow blueprint that describes how to process an event once received.
- Webhooks: The delivery method that gets the event into the Client.
Example Taskflow YAML:
name: "UserSignupFlow"
tasks:
- id: "verify_email"
type: "email_verification"
config:
retry_count: 3
- id: "send_welcome_email"
type: "notification"
dependencies: ["verify_email"]
Mermaid Diagram:
graph TD
Event(Webhook: User Signup) --> Verify[Verify Email]
Verify --> Welcome[Send Welcome Email]
These core concepts ensure the Client MindFlight AI is lightweight, scalable, and easy to configure, making it a robust entry point into your MindFlight AI ecosystem.