Files
kanban/backend/modules.go
T
egutierrez c9e15513c7 chore: auto-commit (23 archivos)
- app.md
- backend/dist/assets/index-CFDWXN9Z.js
- backend/dist/index.html
- backend/handlers.go
- backend/main.go
- backend/users.go
- e2e/smoke_live.sh
- frontend/src/App.tsx
- frontend/src/api.ts
- frontend/src/components/CardChatPanel.tsx
- ...

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 18:22:44 +02:00

726 lines
19 KiB
Go

package main
import (
"bytes"
"context"
"database/sql"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"strings"
"time"
)
// =============================================================================
// Module model
// =============================================================================
type Module struct {
ID string `json:"id"`
Name string `json:"name"`
Kind string `json:"kind"`
Enabled bool `json:"enabled"`
EventFilter []string `json:"event_filter"`
Config JSONValue `json:"config"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
}
// JSONValue is an arbitrary JSON object decoded into a generic map. We do not
// model per-kind config in Go types because the set of kinds grows over time
// and the dispatcher only inspects fields it knows.
type JSONValue map[string]interface{}
type ModuleLog struct {
ID string `json:"id"`
ModuleID string `json:"module_id"`
EventType string `json:"event_type"`
CardID string `json:"card_id"`
Status int `json:"status"`
DurationMs int `json:"duration_ms"`
Error string `json:"error"`
CreatedAt string `json:"created_at"`
}
// =============================================================================
// DB helpers (modules + logs)
// =============================================================================
// listModulesEnabled returns all enabled modules with their config decrypted.
// Disabled modules are silently skipped — callers iterate the result without
// further filtering.
func (db *DB) listModulesEnabled() ([]Module, error) {
return db.listModulesWhere("WHERE enabled = 1")
}
func (db *DB) listModulesAll() ([]Module, error) {
return db.listModulesWhere("")
}
func (db *DB) listModulesWhere(filter string) ([]Module, error) {
q := `SELECT id, name, kind, enabled, event_filter, config_cipher, config_nonce, created_at, updated_at FROM modules ` + filter + ` ORDER BY created_at`
rows, err := db.conn.Query(q)
if err != nil {
return nil, err
}
defer rows.Close()
out := []Module{}
for rows.Next() {
var m Module
var enabled int
var filter, createdAt, updatedAt string
var cipherBlob, nonce []byte
if err := rows.Scan(&m.ID, &m.Name, &m.Kind, &enabled, &filter, &cipherBlob, &nonce, &createdAt, &updatedAt); err != nil {
return nil, err
}
m.Enabled = enabled == 1
m.EventFilter = splitCSV(filter)
m.CreatedAt = createdAt
m.UpdatedAt = updatedAt
cfg, err := decryptConfig(cipherBlob, nonce)
if err != nil {
// Surface the decrypt failure so the operator notices but
// avoid dropping the module from the list entirely.
log.Printf("module %s: decrypt config: %v", m.ID, err)
m.Config = JSONValue{"_decrypt_error": err.Error()}
} else {
_ = json.Unmarshal(cfg, &m.Config)
}
out = append(out, m)
}
return out, rows.Err()
}
func (db *DB) getModule(id string) (*Module, error) {
mods, err := db.listModulesWhere(`WHERE id = '` + escapeSQL(id) + `'`)
if err != nil || len(mods) == 0 {
if err == nil {
err = sql.ErrNoRows
}
return nil, err
}
return &mods[0], nil
}
func escapeSQL(s string) string {
return strings.ReplaceAll(s, "'", "''")
}
func (db *DB) saveModule(m *Module) error {
cfgJSON, err := json.Marshal(m.Config)
if err != nil {
return err
}
cipherBlob, nonce, err := encryptConfig(cfgJSON)
if err != nil {
return err
}
now := nowRFC3339()
if m.ID == "" {
m.ID = newID()
m.CreatedAt = now
m.UpdatedAt = now
_, err = db.conn.Exec(
`INSERT INTO modules (id, name, kind, enabled, event_filter, config_cipher, config_nonce, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
m.ID, m.Name, m.Kind, boolInt(m.Enabled), strings.Join(m.EventFilter, ","),
cipherBlob, nonce, m.CreatedAt, m.UpdatedAt,
)
return err
}
m.UpdatedAt = now
_, err = db.conn.Exec(
`UPDATE modules SET name=?, kind=?, enabled=?, event_filter=?, config_cipher=?, config_nonce=?, updated_at=? WHERE id=?`,
m.Name, m.Kind, boolInt(m.Enabled), strings.Join(m.EventFilter, ","),
cipherBlob, nonce, m.UpdatedAt, m.ID,
)
return err
}
func (db *DB) deleteModule(id string) error {
_, err := db.conn.Exec(`DELETE FROM modules WHERE id=?`, id)
return err
}
func (db *DB) appendModuleLog(l ModuleLog) error {
if l.ID == "" {
l.ID = newID()
}
if l.CreatedAt == "" {
l.CreatedAt = nowRFC3339()
}
_, err := db.conn.Exec(
`INSERT INTO module_logs (id, module_id, event_type, card_id, status, duration_ms, error, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
l.ID, l.ModuleID, l.EventType, l.CardID, l.Status, l.DurationMs, l.Error, l.CreatedAt,
)
return err
}
func (db *DB) listModuleLogs(moduleID string, limit int) ([]ModuleLog, error) {
if limit <= 0 {
limit = 100
}
rows, err := db.conn.Query(
`SELECT id, module_id, event_type, card_id, status, duration_ms, error, created_at
FROM module_logs WHERE module_id = ? ORDER BY created_at DESC LIMIT ?`,
moduleID, limit,
)
if err != nil {
return nil, err
}
defer rows.Close()
out := []ModuleLog{}
for rows.Next() {
var l ModuleLog
var cardID sql.NullString
if err := rows.Scan(&l.ID, &l.ModuleID, &l.EventType, &cardID, &l.Status, &l.DurationMs, &l.Error, &l.CreatedAt); err != nil {
return nil, err
}
if cardID.Valid {
l.CardID = cardID.String
}
out = append(out, l)
}
return out, rows.Err()
}
// setCardJiraKey stores the Jira issue key for a card after a successful
// create call. We skip the regular UpdateCard path to avoid emitting a
// `card.updated` event (which would loop us back through the dispatcher).
func (db *DB) setCardJiraKey(cardID, jiraKey string) error {
_, err := db.conn.Exec(`UPDATE cards SET jira_key=? WHERE id=?`, jiraKey, cardID)
return err
}
func (db *DB) getCardForJira(cardID string) (*cardForJira, error) {
var c cardForJira
var assignee, deadline, jiraKey sql.NullString
var tagsJSON string
err := db.conn.QueryRow(
`SELECT c.id, c.title, c.description, c.requester, c.column_id, c.assignee_id,
c.deadline, c.tags, c.jira_key, c.created_at, col.name
FROM cards c JOIN columns col ON col.id = c.column_id WHERE c.id = ?`,
cardID,
).Scan(&c.ID, &c.Title, &c.Description, &c.Requester, &c.ColumnID, &assignee,
&deadline, &tagsJSON, &jiraKey, &c.CreatedAt, &c.ColumnName)
if err != nil {
return nil, err
}
if assignee.Valid {
c.AssigneeID = assignee.String
}
if deadline.Valid {
c.Deadline = deadline.String
}
if jiraKey.Valid {
c.JiraKey = jiraKey.String
}
_ = json.Unmarshal([]byte(tagsJSON), &c.Tags)
return &c, nil
}
type cardForJira struct {
ID string
Title string
Description string
Requester string
ColumnID string
ColumnName string
AssigneeID string
Deadline string
Tags []string
JiraKey string
CreatedAt string
}
func (c *cardForJira) hasTag(name string) bool {
name = strings.ToLower(name)
for _, t := range c.Tags {
if strings.ToLower(t) == name {
return true
}
}
return false
}
func splitCSV(s string) []string {
if s == "" {
return nil
}
parts := strings.Split(s, ",")
out := make([]string, 0, len(parts))
for _, p := range parts {
p = strings.TrimSpace(p)
if p != "" {
out = append(out, p)
}
}
return out
}
func boolInt(b bool) int {
if b {
return 1
}
return 0
}
// =============================================================================
// Dispatcher
// =============================================================================
const (
moduleRetries = 3
moduleRetryDelay1 = 1 * time.Second
moduleRetryDelay2 = 5 * time.Second
moduleRetryDelay3 = 30 * time.Second
moduleHTTPTimeout = 15 * time.Second
moduleOptOutTag = "nojira"
moduleDispatchQueue = 256
)
// Dispatcher fans events from the EventHub into per-module handlers.
//
// Lifecycle:
// - Start() spawns a single subscriber goroutine on the hub plus a
// bounded worker pool.
// - Stop() cancels the context and waits for in-flight requests to drain.
//
// Handlers receive a decrypted Module copy + the Event; they own the HTTP
// call to the target system. The dispatcher logs every attempt.
type Dispatcher struct {
db *DB
hub *EventHub
handlers map[string]Handler
queue chan dispatchTask
ctx context.Context
cancel context.CancelFunc
enabled bool
}
type dispatchTask struct {
module Module
event Event
}
type Handler interface {
Handle(ctx context.Context, db *DB, m Module, ev Event) (status int, err error)
TestConnection(ctx context.Context, m Module) (status int, err error)
}
func NewDispatcher(db *DB, hub *EventHub) *Dispatcher {
_, hasKey := moduleKey()
return &Dispatcher{
db: db,
hub: hub,
handlers: map[string]Handler{"jira": &jiraHandler{}},
queue: make(chan dispatchTask, moduleDispatchQueue),
enabled: hasKey,
}
}
func (d *Dispatcher) Start() {
if !d.enabled {
log.Printf("module dispatcher disabled (%s not set)", moduleKeyEnv)
return
}
d.ctx, d.cancel = context.WithCancel(context.Background())
// Subscribe under a synthetic user so the hub treats us as a normal
// recipient of broadcast events. Private user-targeted events are
// uninteresting for outbound sync.
go d.run()
for i := 0; i < 4; i++ {
go d.worker(i)
}
log.Printf("module dispatcher started")
}
func (d *Dispatcher) Stop() {
if d.cancel != nil {
d.cancel()
}
}
func (d *Dispatcher) run() {
ch := d.hub.SubscribeUser("__module_dispatcher__")
defer d.hub.UnsubscribeUser("__module_dispatcher__", ch)
for {
select {
case <-d.ctx.Done():
return
case ev, ok := <-ch:
if !ok {
return
}
d.fanout(ev)
}
}
}
func (d *Dispatcher) fanout(ev Event) {
mods, err := d.db.listModulesEnabled()
if err != nil {
log.Printf("dispatcher: listModulesEnabled: %v", err)
return
}
for _, m := range mods {
if !filterMatches(m.EventFilter, ev.Type) {
continue
}
if !cutoffOK(d.db, m, ev) {
continue
}
if ev.CardID != "" {
c, err := d.db.getCardForJira(ev.CardID)
if err == nil && c.hasTag(moduleOptOutTag) {
continue
}
}
select {
case d.queue <- dispatchTask{module: m, event: ev}:
default:
log.Printf("dispatcher: queue full, dropping event %s for module %s", ev.Type, m.ID)
}
}
}
func (d *Dispatcher) worker(id int) {
for {
select {
case <-d.ctx.Done():
return
case task, ok := <-d.queue:
if !ok {
return
}
d.dispatch(task)
}
}
}
// dispatch runs the handler with up to moduleRetries attempts using a
// fixed back-off schedule (1s, 5s, 30s). Each attempt creates a log row;
// the final outcome is the one returned to the caller.
func (d *Dispatcher) dispatch(t dispatchTask) {
h, ok := d.handlers[t.module.Kind]
if !ok {
_ = d.db.appendModuleLog(ModuleLog{
ModuleID: t.module.ID, EventType: t.event.Type, CardID: t.event.CardID,
Error: "unknown module kind: " + t.module.Kind,
})
return
}
delays := []time.Duration{0, moduleRetryDelay1, moduleRetryDelay2, moduleRetryDelay3}
for attempt := 0; attempt < moduleRetries; attempt++ {
if delays[attempt] > 0 {
select {
case <-d.ctx.Done():
return
case <-time.After(delays[attempt]):
}
}
ctx, cancel := context.WithTimeout(d.ctx, moduleHTTPTimeout)
start := time.Now()
status, err := h.Handle(ctx, d.db, t.module, t.event)
cancel()
ml := ModuleLog{
ModuleID: t.module.ID, EventType: t.event.Type, CardID: t.event.CardID,
Status: status, DurationMs: int(time.Since(start).Milliseconds()),
}
if err != nil {
ml.Error = err.Error()
}
_ = d.db.appendModuleLog(ml)
if err == nil {
return
}
// 4xx client errors are not worth retrying.
if status >= 400 && status < 500 {
return
}
}
}
// =============================================================================
// Helpers
// =============================================================================
func filterMatches(filter []string, eventType string) bool {
for _, f := range filter {
if f == eventType || f == "*" {
return true
}
}
return false
}
// cutoffOK applies the "module only sees events posterior to its creation"
// rule. Cards that were already linked to Jira (jira_key != "") are always
// eligible regardless of timestamps.
func cutoffOK(db *DB, m Module, ev Event) bool {
if ev.CardID == "" {
return true
}
c, err := db.getCardForJira(ev.CardID)
if err != nil {
return false
}
if c.JiraKey != "" {
return true
}
return c.CreatedAt >= m.CreatedAt
}
// =============================================================================
// Jira handler
// =============================================================================
type jiraHandler struct{}
type jiraConfig struct {
BaseURL string `json:"base_url"`
Email string `json:"email"`
APIToken string `json:"api_token"`
ProjectKey string `json:"project_key"`
StatusMap map[string]string `json:"status_map"`
}
func parseJiraConfig(m Module) (jiraConfig, error) {
b, err := json.Marshal(m.Config)
if err != nil {
return jiraConfig{}, err
}
var c jiraConfig
if err := json.Unmarshal(b, &c); err != nil {
return jiraConfig{}, err
}
c.BaseURL = strings.TrimRight(c.BaseURL, "/")
if c.BaseURL == "" {
return c, fmt.Errorf("base_url required")
}
return c, nil
}
func (h *jiraHandler) jiraRequest(ctx context.Context, c jiraConfig, method, path string, body interface{}) (int, []byte, error) {
var rdr io.Reader
if body != nil {
b, err := json.Marshal(body)
if err != nil {
return 0, nil, err
}
rdr = bytes.NewReader(b)
}
req, err := http.NewRequestWithContext(ctx, method, c.BaseURL+path, rdr)
if err != nil {
return 0, nil, err
}
req.Header.Set("Accept", "application/json")
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
if c.Email != "" && c.APIToken != "" {
basic := base64.StdEncoding.EncodeToString([]byte(c.Email + ":" + c.APIToken))
req.Header.Set("Authorization", "Basic "+basic)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return 0, nil, err
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
if resp.StatusCode >= 400 {
return resp.StatusCode, respBody, fmt.Errorf("jira %s %s: %d %s", method, path, resp.StatusCode, truncate(respBody, 240))
}
return resp.StatusCode, respBody, nil
}
func truncate(b []byte, n int) string {
if len(b) <= n {
return string(b)
}
return string(b[:n]) + "…"
}
func (h *jiraHandler) TestConnection(ctx context.Context, m Module) (int, error) {
c, err := parseJiraConfig(m)
if err != nil {
return 0, err
}
status, _, err := h.jiraRequest(ctx, c, http.MethodGet, "/rest/api/3/myself", nil)
return status, err
}
func (h *jiraHandler) Handle(ctx context.Context, db *DB, m Module, ev Event) (int, error) {
c, err := parseJiraConfig(m)
if err != nil {
return 0, err
}
switch ev.Type {
case "card.created":
return h.create(ctx, db, c, ev)
case "card.updated", "board.invalidated":
return h.update(ctx, db, c, ev)
case "card.moved":
return h.transition(ctx, db, c, ev)
case "message.created":
return h.comment(ctx, db, c, ev)
default:
// Silently ignore unhandled event types so the dispatcher does not
// retry on irrelevant traffic.
return 200, nil
}
}
func (h *jiraHandler) create(ctx context.Context, db *DB, c jiraConfig, ev Event) (int, error) {
if ev.CardID == "" {
return 0, nil
}
card, err := db.getCardForJira(ev.CardID)
if err != nil {
return 0, err
}
if card.JiraKey != "" {
// Idempotent: card already linked to Jira; treat as update.
return h.update(ctx, db, c, ev)
}
if c.ProjectKey == "" {
return 0, fmt.Errorf("project_key required for create")
}
body := map[string]interface{}{
"fields": map[string]interface{}{
"project": map[string]string{"key": c.ProjectKey},
"summary": card.Title,
"description": adfText(card.Description),
"issuetype": map[string]string{"name": "Task"},
},
}
status, resp, err := h.jiraRequest(ctx, c, http.MethodPost, "/rest/api/3/issue", body)
if err != nil {
return status, err
}
var parsed struct {
Key string `json:"key"`
}
_ = json.Unmarshal(resp, &parsed)
if parsed.Key != "" {
if err := db.setCardJiraKey(card.ID, parsed.Key); err != nil {
return status, fmt.Errorf("link jira key: %w", err)
}
}
return status, nil
}
func (h *jiraHandler) update(ctx context.Context, db *DB, c jiraConfig, ev Event) (int, error) {
if ev.CardID == "" {
return 0, nil
}
card, err := db.getCardForJira(ev.CardID)
if err != nil {
return 0, err
}
if card.JiraKey == "" {
// Card not yet linked — bootstrap by creating it.
return h.create(ctx, db, c, ev)
}
body := map[string]interface{}{
"fields": map[string]interface{}{
"summary": card.Title,
"description": adfText(card.Description),
},
}
status, _, err := h.jiraRequest(ctx, c, http.MethodPut, "/rest/api/3/issue/"+card.JiraKey, body)
return status, err
}
// transition uses the configured status_map to translate the kanban column
// to a Jira transition name. We list available transitions, find the one
// whose target status name matches, and POST it. Kanban remains the source
// of truth even if Jira's current state differs.
func (h *jiraHandler) transition(ctx context.Context, db *DB, c jiraConfig, ev Event) (int, error) {
if ev.CardID == "" {
return 0, nil
}
card, err := db.getCardForJira(ev.CardID)
if err != nil {
return 0, err
}
if card.JiraKey == "" {
return h.create(ctx, db, c, ev)
}
target, ok := c.StatusMap[card.ColumnName]
if !ok || target == "" {
return 0, fmt.Errorf("no status_map entry for column %q", card.ColumnName)
}
status, body, err := h.jiraRequest(ctx, c, http.MethodGet, "/rest/api/3/issue/"+card.JiraKey+"/transitions", nil)
if err != nil {
return status, err
}
var available struct {
Transitions []struct {
ID string `json:"id"`
Name string `json:"name"`
To struct {
Name string `json:"name"`
} `json:"to"`
} `json:"transitions"`
}
if err := json.Unmarshal(body, &available); err != nil {
return status, fmt.Errorf("decode transitions: %w", err)
}
var tID string
for _, t := range available.Transitions {
if strings.EqualFold(t.To.Name, target) || strings.EqualFold(t.Name, target) {
tID = t.ID
break
}
}
if tID == "" {
return 0, fmt.Errorf("transition %q not available for %s", target, card.JiraKey)
}
req := map[string]interface{}{"transition": map[string]string{"id": tID}}
status, _, err = h.jiraRequest(ctx, c, http.MethodPost, "/rest/api/3/issue/"+card.JiraKey+"/transitions", req)
return status, err
}
func (h *jiraHandler) comment(ctx context.Context, db *DB, c jiraConfig, ev Event) (int, error) {
if ev.CardID == "" {
return 0, nil
}
card, err := db.getCardForJira(ev.CardID)
if err != nil {
return 0, err
}
if card.JiraKey == "" {
// Cannot comment on a card not yet synced; skip.
return 0, nil
}
var payload struct {
Body string `json:"body"`
}
_ = json.Unmarshal(ev.Payload, &payload)
if payload.Body == "" {
return 0, nil
}
body := map[string]interface{}{"body": adfText(payload.Body)}
status, _, err := h.jiraRequest(ctx, c, http.MethodPost, "/rest/api/3/issue/"+card.JiraKey+"/comment", body)
return status, err
}
// adfText wraps a plain string into the minimal Atlassian Document Format
// fragment Jira Cloud requires for description / comment bodies.
func adfText(s string) map[string]interface{} {
return map[string]interface{}{
"type": "doc",
"version": 1,
"content": []map[string]interface{}{{
"type": "paragraph",
"content": []map[string]interface{}{{
"type": "text",
"text": s,
}},
}},
}
}