package main import ( "bytes" "context" "database/sql" "encoding/base64" "encoding/json" "fmt" "io" "log" "net/http" "strings" "time" ) // ============================================================================= // Module model // ============================================================================= type Module struct { ID string `json:"id"` Name string `json:"name"` Kind string `json:"kind"` Enabled bool `json:"enabled"` EventFilter []string `json:"event_filter"` Config JSONValue `json:"config"` CreatedAt string `json:"created_at"` UpdatedAt string `json:"updated_at"` } // JSONValue is an arbitrary JSON object decoded into a generic map. We do not // model per-kind config in Go types because the set of kinds grows over time // and the dispatcher only inspects fields it knows. type JSONValue map[string]interface{} type ModuleLog struct { ID string `json:"id"` ModuleID string `json:"module_id"` EventType string `json:"event_type"` CardID string `json:"card_id"` Status int `json:"status"` DurationMs int `json:"duration_ms"` Error string `json:"error"` CreatedAt string `json:"created_at"` } // ============================================================================= // DB helpers (modules + logs) // ============================================================================= // listModulesEnabled returns all enabled modules with their config decrypted. // Disabled modules are silently skipped — callers iterate the result without // further filtering. func (db *DB) listModulesEnabled() ([]Module, error) { return db.listModulesWhere("WHERE enabled = 1") } func (db *DB) listModulesAll() ([]Module, error) { return db.listModulesWhere("") } func (db *DB) listModulesWhere(filter string) ([]Module, error) { q := `SELECT id, name, kind, enabled, event_filter, config_cipher, config_nonce, created_at, updated_at FROM modules ` + filter + ` ORDER BY created_at` rows, err := db.conn.Query(q) if err != nil { return nil, err } defer rows.Close() out := []Module{} for rows.Next() { var m Module var enabled int var filter, createdAt, updatedAt string var cipherBlob, nonce []byte if err := rows.Scan(&m.ID, &m.Name, &m.Kind, &enabled, &filter, &cipherBlob, &nonce, &createdAt, &updatedAt); err != nil { return nil, err } m.Enabled = enabled == 1 m.EventFilter = splitCSV(filter) m.CreatedAt = createdAt m.UpdatedAt = updatedAt cfg, err := decryptConfig(cipherBlob, nonce) if err != nil { // Surface the decrypt failure so the operator notices but // avoid dropping the module from the list entirely. log.Printf("module %s: decrypt config: %v", m.ID, err) m.Config = JSONValue{"_decrypt_error": err.Error()} } else { _ = json.Unmarshal(cfg, &m.Config) } out = append(out, m) } return out, rows.Err() } func (db *DB) getModule(id string) (*Module, error) { mods, err := db.listModulesWhere(`WHERE id = '` + escapeSQL(id) + `'`) if err != nil || len(mods) == 0 { if err == nil { err = sql.ErrNoRows } return nil, err } return &mods[0], nil } func escapeSQL(s string) string { return strings.ReplaceAll(s, "'", "''") } func (db *DB) saveModule(m *Module) error { cfgJSON, err := json.Marshal(m.Config) if err != nil { return err } cipherBlob, nonce, err := encryptConfig(cfgJSON) if err != nil { return err } now := nowRFC3339() if m.ID == "" { m.ID = newID() m.CreatedAt = now m.UpdatedAt = now _, err = db.conn.Exec( `INSERT INTO modules (id, name, kind, enabled, event_filter, config_cipher, config_nonce, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, m.ID, m.Name, m.Kind, boolInt(m.Enabled), strings.Join(m.EventFilter, ","), cipherBlob, nonce, m.CreatedAt, m.UpdatedAt, ) return err } m.UpdatedAt = now _, err = db.conn.Exec( `UPDATE modules SET name=?, kind=?, enabled=?, event_filter=?, config_cipher=?, config_nonce=?, updated_at=? WHERE id=?`, m.Name, m.Kind, boolInt(m.Enabled), strings.Join(m.EventFilter, ","), cipherBlob, nonce, m.UpdatedAt, m.ID, ) return err } func (db *DB) deleteModule(id string) error { _, err := db.conn.Exec(`DELETE FROM modules WHERE id=?`, id) return err } func (db *DB) appendModuleLog(l ModuleLog) error { if l.ID == "" { l.ID = newID() } if l.CreatedAt == "" { l.CreatedAt = nowRFC3339() } _, err := db.conn.Exec( `INSERT INTO module_logs (id, module_id, event_type, card_id, status, duration_ms, error, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, l.ID, l.ModuleID, l.EventType, l.CardID, l.Status, l.DurationMs, l.Error, l.CreatedAt, ) return err } func (db *DB) listModuleLogs(moduleID string, limit int) ([]ModuleLog, error) { if limit <= 0 { limit = 100 } rows, err := db.conn.Query( `SELECT id, module_id, event_type, card_id, status, duration_ms, error, created_at FROM module_logs WHERE module_id = ? ORDER BY created_at DESC LIMIT ?`, moduleID, limit, ) if err != nil { return nil, err } defer rows.Close() out := []ModuleLog{} for rows.Next() { var l ModuleLog var cardID sql.NullString if err := rows.Scan(&l.ID, &l.ModuleID, &l.EventType, &cardID, &l.Status, &l.DurationMs, &l.Error, &l.CreatedAt); err != nil { return nil, err } if cardID.Valid { l.CardID = cardID.String } out = append(out, l) } return out, rows.Err() } // setCardJiraKey stores the Jira issue key for a card after a successful // create call. We skip the regular UpdateCard path to avoid emitting a // `card.updated` event (which would loop us back through the dispatcher). func (db *DB) setCardJiraKey(cardID, jiraKey string) error { _, err := db.conn.Exec(`UPDATE cards SET jira_key=? WHERE id=?`, jiraKey, cardID) return err } func (db *DB) getCardForJira(cardID string) (*cardForJira, error) { var c cardForJira var assignee, deadline, jiraKey sql.NullString var tagsJSON string err := db.conn.QueryRow( `SELECT c.id, c.title, c.description, c.requester, c.column_id, c.assignee_id, c.deadline, c.tags, c.jira_key, c.created_at, col.name FROM cards c JOIN columns col ON col.id = c.column_id WHERE c.id = ?`, cardID, ).Scan(&c.ID, &c.Title, &c.Description, &c.Requester, &c.ColumnID, &assignee, &deadline, &tagsJSON, &jiraKey, &c.CreatedAt, &c.ColumnName) if err != nil { return nil, err } if assignee.Valid { c.AssigneeID = assignee.String } if deadline.Valid { c.Deadline = deadline.String } if jiraKey.Valid { c.JiraKey = jiraKey.String } _ = json.Unmarshal([]byte(tagsJSON), &c.Tags) return &c, nil } type cardForJira struct { ID string Title string Description string Requester string ColumnID string ColumnName string AssigneeID string Deadline string Tags []string JiraKey string CreatedAt string } func (c *cardForJira) hasTag(name string) bool { name = strings.ToLower(name) for _, t := range c.Tags { if strings.ToLower(t) == name { return true } } return false } func splitCSV(s string) []string { if s == "" { return nil } parts := strings.Split(s, ",") out := make([]string, 0, len(parts)) for _, p := range parts { p = strings.TrimSpace(p) if p != "" { out = append(out, p) } } return out } func boolInt(b bool) int { if b { return 1 } return 0 } // ============================================================================= // Dispatcher // ============================================================================= const ( moduleRetries = 3 moduleRetryDelay1 = 1 * time.Second moduleRetryDelay2 = 5 * time.Second moduleRetryDelay3 = 30 * time.Second moduleHTTPTimeout = 15 * time.Second moduleOptOutTag = "nojira" moduleDispatchQueue = 256 ) // Dispatcher fans events from the EventHub into per-module handlers. // // Lifecycle: // - Start() spawns a single subscriber goroutine on the hub plus a // bounded worker pool. // - Stop() cancels the context and waits for in-flight requests to drain. // // Handlers receive a decrypted Module copy + the Event; they own the HTTP // call to the target system. The dispatcher logs every attempt. type Dispatcher struct { db *DB hub *EventHub handlers map[string]Handler queue chan dispatchTask ctx context.Context cancel context.CancelFunc enabled bool } type dispatchTask struct { module Module event Event } type Handler interface { Handle(ctx context.Context, db *DB, m Module, ev Event) (status int, err error) TestConnection(ctx context.Context, m Module) (status int, err error) } func NewDispatcher(db *DB, hub *EventHub) *Dispatcher { _, hasKey := moduleKey() return &Dispatcher{ db: db, hub: hub, handlers: map[string]Handler{"jira": &jiraHandler{}}, queue: make(chan dispatchTask, moduleDispatchQueue), enabled: hasKey, } } func (d *Dispatcher) Start() { if !d.enabled { log.Printf("module dispatcher disabled (%s not set)", moduleKeyEnv) return } d.ctx, d.cancel = context.WithCancel(context.Background()) // Subscribe under a synthetic user so the hub treats us as a normal // recipient of broadcast events. Private user-targeted events are // uninteresting for outbound sync. go d.run() for i := 0; i < 4; i++ { go d.worker(i) } log.Printf("module dispatcher started") } func (d *Dispatcher) Stop() { if d.cancel != nil { d.cancel() } } func (d *Dispatcher) run() { ch := d.hub.SubscribeUser("__module_dispatcher__") defer d.hub.UnsubscribeUser("__module_dispatcher__", ch) for { select { case <-d.ctx.Done(): return case ev, ok := <-ch: if !ok { return } d.fanout(ev) } } } func (d *Dispatcher) fanout(ev Event) { mods, err := d.db.listModulesEnabled() if err != nil { log.Printf("dispatcher: listModulesEnabled: %v", err) return } for _, m := range mods { if !filterMatches(m.EventFilter, ev.Type) { continue } if !cutoffOK(d.db, m, ev) { continue } if ev.CardID != "" { c, err := d.db.getCardForJira(ev.CardID) if err == nil && c.hasTag(moduleOptOutTag) { continue } } select { case d.queue <- dispatchTask{module: m, event: ev}: default: log.Printf("dispatcher: queue full, dropping event %s for module %s", ev.Type, m.ID) } } } func (d *Dispatcher) worker(id int) { for { select { case <-d.ctx.Done(): return case task, ok := <-d.queue: if !ok { return } d.dispatch(task) } } } // dispatch runs the handler with up to moduleRetries attempts using a // fixed back-off schedule (1s, 5s, 30s). Each attempt creates a log row; // the final outcome is the one returned to the caller. func (d *Dispatcher) dispatch(t dispatchTask) { h, ok := d.handlers[t.module.Kind] if !ok { _ = d.db.appendModuleLog(ModuleLog{ ModuleID: t.module.ID, EventType: t.event.Type, CardID: t.event.CardID, Error: "unknown module kind: " + t.module.Kind, }) return } delays := []time.Duration{0, moduleRetryDelay1, moduleRetryDelay2, moduleRetryDelay3} for attempt := 0; attempt < moduleRetries; attempt++ { if delays[attempt] > 0 { select { case <-d.ctx.Done(): return case <-time.After(delays[attempt]): } } ctx, cancel := context.WithTimeout(d.ctx, moduleHTTPTimeout) start := time.Now() status, err := h.Handle(ctx, d.db, t.module, t.event) cancel() ml := ModuleLog{ ModuleID: t.module.ID, EventType: t.event.Type, CardID: t.event.CardID, Status: status, DurationMs: int(time.Since(start).Milliseconds()), } if err != nil { ml.Error = err.Error() } _ = d.db.appendModuleLog(ml) if err == nil { return } // 4xx client errors are not worth retrying. if status >= 400 && status < 500 { return } } } // ============================================================================= // Helpers // ============================================================================= func filterMatches(filter []string, eventType string) bool { for _, f := range filter { if f == eventType || f == "*" { return true } } return false } // cutoffOK applies the "module only sees events posterior to its creation" // rule. Cards that were already linked to Jira (jira_key != "") are always // eligible regardless of timestamps. func cutoffOK(db *DB, m Module, ev Event) bool { if ev.CardID == "" { return true } c, err := db.getCardForJira(ev.CardID) if err != nil { return false } if c.JiraKey != "" { return true } return c.CreatedAt >= m.CreatedAt } // ============================================================================= // Jira handler // ============================================================================= type jiraHandler struct{} type jiraConfig struct { BaseURL string `json:"base_url"` Email string `json:"email"` APIToken string `json:"api_token"` ProjectKey string `json:"project_key"` StatusMap map[string]string `json:"status_map"` } func parseJiraConfig(m Module) (jiraConfig, error) { b, err := json.Marshal(m.Config) if err != nil { return jiraConfig{}, err } var c jiraConfig if err := json.Unmarshal(b, &c); err != nil { return jiraConfig{}, err } c.BaseURL = strings.TrimRight(c.BaseURL, "/") if c.BaseURL == "" { return c, fmt.Errorf("base_url required") } return c, nil } func (h *jiraHandler) jiraRequest(ctx context.Context, c jiraConfig, method, path string, body interface{}) (int, []byte, error) { var rdr io.Reader if body != nil { b, err := json.Marshal(body) if err != nil { return 0, nil, err } rdr = bytes.NewReader(b) } req, err := http.NewRequestWithContext(ctx, method, c.BaseURL+path, rdr) if err != nil { return 0, nil, err } req.Header.Set("Accept", "application/json") if body != nil { req.Header.Set("Content-Type", "application/json") } if c.Email != "" && c.APIToken != "" { basic := base64.StdEncoding.EncodeToString([]byte(c.Email + ":" + c.APIToken)) req.Header.Set("Authorization", "Basic "+basic) } resp, err := http.DefaultClient.Do(req) if err != nil { return 0, nil, err } defer resp.Body.Close() respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) if resp.StatusCode >= 400 { return resp.StatusCode, respBody, fmt.Errorf("jira %s %s: %d %s", method, path, resp.StatusCode, truncate(respBody, 240)) } return resp.StatusCode, respBody, nil } func truncate(b []byte, n int) string { if len(b) <= n { return string(b) } return string(b[:n]) + "…" } func (h *jiraHandler) TestConnection(ctx context.Context, m Module) (int, error) { c, err := parseJiraConfig(m) if err != nil { return 0, err } status, _, err := h.jiraRequest(ctx, c, http.MethodGet, "/rest/api/3/myself", nil) return status, err } func (h *jiraHandler) Handle(ctx context.Context, db *DB, m Module, ev Event) (int, error) { c, err := parseJiraConfig(m) if err != nil { return 0, err } switch ev.Type { case "card.created": return h.create(ctx, db, c, ev) case "card.updated", "board.invalidated": return h.update(ctx, db, c, ev) case "card.moved": return h.transition(ctx, db, c, ev) case "message.created": return h.comment(ctx, db, c, ev) default: // Silently ignore unhandled event types so the dispatcher does not // retry on irrelevant traffic. return 200, nil } } func (h *jiraHandler) create(ctx context.Context, db *DB, c jiraConfig, ev Event) (int, error) { if ev.CardID == "" { return 0, nil } card, err := db.getCardForJira(ev.CardID) if err != nil { return 0, err } if card.JiraKey != "" { // Idempotent: card already linked to Jira; treat as update. return h.update(ctx, db, c, ev) } if c.ProjectKey == "" { return 0, fmt.Errorf("project_key required for create") } body := map[string]interface{}{ "fields": map[string]interface{}{ "project": map[string]string{"key": c.ProjectKey}, "summary": card.Title, "description": adfText(card.Description), "issuetype": map[string]string{"name": "Task"}, }, } status, resp, err := h.jiraRequest(ctx, c, http.MethodPost, "/rest/api/3/issue", body) if err != nil { return status, err } var parsed struct { Key string `json:"key"` } _ = json.Unmarshal(resp, &parsed) if parsed.Key != "" { if err := db.setCardJiraKey(card.ID, parsed.Key); err != nil { return status, fmt.Errorf("link jira key: %w", err) } } return status, nil } func (h *jiraHandler) update(ctx context.Context, db *DB, c jiraConfig, ev Event) (int, error) { if ev.CardID == "" { return 0, nil } card, err := db.getCardForJira(ev.CardID) if err != nil { return 0, err } if card.JiraKey == "" { // Card not yet linked — bootstrap by creating it. return h.create(ctx, db, c, ev) } body := map[string]interface{}{ "fields": map[string]interface{}{ "summary": card.Title, "description": adfText(card.Description), }, } status, _, err := h.jiraRequest(ctx, c, http.MethodPut, "/rest/api/3/issue/"+card.JiraKey, body) return status, err } // transition uses the configured status_map to translate the kanban column // to a Jira transition name. We list available transitions, find the one // whose target status name matches, and POST it. Kanban remains the source // of truth even if Jira's current state differs. func (h *jiraHandler) transition(ctx context.Context, db *DB, c jiraConfig, ev Event) (int, error) { if ev.CardID == "" { return 0, nil } card, err := db.getCardForJira(ev.CardID) if err != nil { return 0, err } if card.JiraKey == "" { return h.create(ctx, db, c, ev) } target, ok := c.StatusMap[card.ColumnName] if !ok || target == "" { return 0, fmt.Errorf("no status_map entry for column %q", card.ColumnName) } status, body, err := h.jiraRequest(ctx, c, http.MethodGet, "/rest/api/3/issue/"+card.JiraKey+"/transitions", nil) if err != nil { return status, err } var available struct { Transitions []struct { ID string `json:"id"` Name string `json:"name"` To struct { Name string `json:"name"` } `json:"to"` } `json:"transitions"` } if err := json.Unmarshal(body, &available); err != nil { return status, fmt.Errorf("decode transitions: %w", err) } var tID string for _, t := range available.Transitions { if strings.EqualFold(t.To.Name, target) || strings.EqualFold(t.Name, target) { tID = t.ID break } } if tID == "" { return 0, fmt.Errorf("transition %q not available for %s", target, card.JiraKey) } req := map[string]interface{}{"transition": map[string]string{"id": tID}} status, _, err = h.jiraRequest(ctx, c, http.MethodPost, "/rest/api/3/issue/"+card.JiraKey+"/transitions", req) return status, err } func (h *jiraHandler) comment(ctx context.Context, db *DB, c jiraConfig, ev Event) (int, error) { if ev.CardID == "" { return 0, nil } card, err := db.getCardForJira(ev.CardID) if err != nil { return 0, err } if card.JiraKey == "" { // Cannot comment on a card not yet synced; skip. return 0, nil } var payload struct { Body string `json:"body"` } _ = json.Unmarshal(ev.Payload, &payload) if payload.Body == "" { return 0, nil } body := map[string]interface{}{"body": adfText(payload.Body)} status, _, err := h.jiraRequest(ctx, c, http.MethodPost, "/rest/api/3/issue/"+card.JiraKey+"/comment", body) return status, err } // adfText wraps a plain string into the minimal Atlassian Document Format // fragment Jira Cloud requires for description / comment bodies. func adfText(s string) map[string]interface{} { return map[string]interface{}{ "type": "doc", "version": 1, "content": []map[string]interface{}{{ "type": "paragraph", "content": []map[string]interface{}{{ "type": "text", "text": s, }}, }}, } }