c5113f75a5
Cambios:
- jiraConfig: nuevo campo AssigneeMap (kanban_user_id -> jira_accountId).
- jiraHandler.create() y update(): aplican fields.assignee={accountId} cuando
card.AssigneeID esta en el map. NO se borra el assignee de Jira cuando no
hay mapeo (evita pisar asignaciones manuales).
- resolveJiraAssignee: helper compartido.
- seed-jira-data: cambio issue_type default Tarea Tecnica -> Epic (board 33
filtra issuetype=Epic). assignee_map inyectada con 3 mapeos confirmados:
egutierrez (Enmaa) -> 712020:2cf3b82f-... (Enmanuel Gutierrez Perez)
amassaguer (alfon) -> 712020:3f3ca9e1-... (Alfonso Massaguer Gomez)
ntajuelo (Nat) -> 712020:feb5f7c5-... (Natalia Tajuelo Gomez)
- Nueva CLI 'kanban resync-jira-fields' con flags
--set-issuetype/--set-assignee/--set-labels/--dry-run/--limit/--batch-size/--pause-sec
Idempotente. PUT /rest/api/3/issue/{key} con los fields del config actual.
Usado para patchear las 127 issues ya creadas con Tarea Tecnica -> Epic +
assignee (donde mapea).
- Ejecutado: 127/127 OK, 0 fail. Board 33 ahora muestra 219 issues totales
(92 Epics previas + 127 nuevas). Sample verificado contra Jira REST API.
990 lines
29 KiB
Go
990 lines
29 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// =============================================================================
|
|
// Module model
|
|
// =============================================================================
|
|
|
|
type Module struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
Kind string `json:"kind"`
|
|
Enabled bool `json:"enabled"`
|
|
EventFilter []string `json:"event_filter"`
|
|
Config JSONValue `json:"config"`
|
|
CreatedAt string `json:"created_at"`
|
|
UpdatedAt string `json:"updated_at"`
|
|
}
|
|
|
|
// JSONValue is an arbitrary JSON object decoded into a generic map. We do not
|
|
// model per-kind config in Go types because the set of kinds grows over time
|
|
// and the dispatcher only inspects fields it knows.
|
|
type JSONValue map[string]interface{}
|
|
|
|
type ModuleLog struct {
|
|
ID string `json:"id"`
|
|
ModuleID string `json:"module_id"`
|
|
EventType string `json:"event_type"`
|
|
CardID string `json:"card_id"`
|
|
Status int `json:"status"`
|
|
DurationMs int `json:"duration_ms"`
|
|
Error string `json:"error"`
|
|
CreatedAt string `json:"created_at"`
|
|
}
|
|
|
|
// =============================================================================
|
|
// DB helpers (modules + logs)
|
|
// =============================================================================
|
|
|
|
// listModulesEnabled returns all enabled modules with their config decrypted.
|
|
// Disabled modules are silently skipped — callers iterate the result without
|
|
// further filtering.
|
|
func (db *DB) listModulesEnabled() ([]Module, error) {
|
|
return db.listModulesWhere("WHERE enabled = 1")
|
|
}
|
|
|
|
func (db *DB) listModulesAll() ([]Module, error) {
|
|
return db.listModulesWhere("")
|
|
}
|
|
|
|
func (db *DB) listModulesWhere(filter string) ([]Module, error) {
|
|
q := `SELECT id, name, kind, enabled, event_filter, config_cipher, config_nonce, created_at, updated_at FROM modules ` + filter + ` ORDER BY created_at`
|
|
rows, err := db.conn.Query(q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
out := []Module{}
|
|
for rows.Next() {
|
|
var m Module
|
|
var enabled int
|
|
var filter, createdAt, updatedAt string
|
|
var cipherBlob, nonce []byte
|
|
if err := rows.Scan(&m.ID, &m.Name, &m.Kind, &enabled, &filter, &cipherBlob, &nonce, &createdAt, &updatedAt); err != nil {
|
|
return nil, err
|
|
}
|
|
m.Enabled = enabled == 1
|
|
m.EventFilter = splitCSV(filter)
|
|
m.CreatedAt = createdAt
|
|
m.UpdatedAt = updatedAt
|
|
cfg, err := decryptConfig(cipherBlob, nonce)
|
|
if err != nil {
|
|
// Surface the decrypt failure so the operator notices but
|
|
// avoid dropping the module from the list entirely.
|
|
log.Printf("module %s: decrypt config: %v", m.ID, err)
|
|
m.Config = JSONValue{"_decrypt_error": err.Error()}
|
|
} else {
|
|
_ = json.Unmarshal(cfg, &m.Config)
|
|
}
|
|
out = append(out, m)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
func (db *DB) getModule(id string) (*Module, error) {
|
|
mods, err := db.listModulesWhere(`WHERE id = '` + escapeSQL(id) + `'`)
|
|
if err != nil || len(mods) == 0 {
|
|
if err == nil {
|
|
err = sql.ErrNoRows
|
|
}
|
|
return nil, err
|
|
}
|
|
return &mods[0], nil
|
|
}
|
|
|
|
func escapeSQL(s string) string {
|
|
return strings.ReplaceAll(s, "'", "''")
|
|
}
|
|
|
|
func (db *DB) saveModule(m *Module) error {
|
|
cfgJSON, err := json.Marshal(m.Config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
cipherBlob, nonce, err := encryptConfig(cfgJSON)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
now := nowRFC3339()
|
|
if m.ID == "" {
|
|
m.ID = newID()
|
|
m.CreatedAt = now
|
|
m.UpdatedAt = now
|
|
_, err = db.conn.Exec(
|
|
`INSERT INTO modules (id, name, kind, enabled, event_filter, config_cipher, config_nonce, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
m.ID, m.Name, m.Kind, boolInt(m.Enabled), strings.Join(m.EventFilter, ","),
|
|
cipherBlob, nonce, m.CreatedAt, m.UpdatedAt,
|
|
)
|
|
return err
|
|
}
|
|
m.UpdatedAt = now
|
|
_, err = db.conn.Exec(
|
|
`UPDATE modules SET name=?, kind=?, enabled=?, event_filter=?, config_cipher=?, config_nonce=?, updated_at=? WHERE id=?`,
|
|
m.Name, m.Kind, boolInt(m.Enabled), strings.Join(m.EventFilter, ","),
|
|
cipherBlob, nonce, m.UpdatedAt, m.ID,
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (db *DB) deleteModule(id string) error {
|
|
_, err := db.conn.Exec(`DELETE FROM modules WHERE id=?`, id)
|
|
return err
|
|
}
|
|
|
|
func (db *DB) appendModuleLog(l ModuleLog) error {
|
|
if l.ID == "" {
|
|
l.ID = newID()
|
|
}
|
|
if l.CreatedAt == "" {
|
|
l.CreatedAt = nowRFC3339()
|
|
}
|
|
_, err := db.conn.Exec(
|
|
`INSERT INTO module_logs (id, module_id, event_type, card_id, status, duration_ms, error, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
l.ID, l.ModuleID, l.EventType, l.CardID, l.Status, l.DurationMs, l.Error, l.CreatedAt,
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (db *DB) listModuleLogs(moduleID string, limit int) ([]ModuleLog, error) {
|
|
if limit <= 0 {
|
|
limit = 100
|
|
}
|
|
rows, err := db.conn.Query(
|
|
`SELECT id, module_id, event_type, card_id, status, duration_ms, error, created_at
|
|
FROM module_logs WHERE module_id = ? ORDER BY created_at DESC LIMIT ?`,
|
|
moduleID, limit,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
out := []ModuleLog{}
|
|
for rows.Next() {
|
|
var l ModuleLog
|
|
var cardID sql.NullString
|
|
if err := rows.Scan(&l.ID, &l.ModuleID, &l.EventType, &cardID, &l.Status, &l.DurationMs, &l.Error, &l.CreatedAt); err != nil {
|
|
return nil, err
|
|
}
|
|
if cardID.Valid {
|
|
l.CardID = cardID.String
|
|
}
|
|
out = append(out, l)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
// setCardJiraKey stores the Jira issue key for a card after a successful
|
|
// create call. We skip the regular UpdateCard path to avoid emitting a
|
|
// `card.updated` event (which would loop us back through the dispatcher).
|
|
func (db *DB) setCardJiraKey(cardID, jiraKey string) error {
|
|
_, err := db.conn.Exec(`UPDATE cards SET jira_key=? WHERE id=?`, jiraKey, cardID)
|
|
return err
|
|
}
|
|
|
|
// listImportedJiraKeys returns a set of jira keys currently linked to any
|
|
// active kanban card. Used by the Jira import picker to filter out issues
|
|
// already present in the kanban.
|
|
func (db *DB) listImportedJiraKeys() (map[string]bool, error) {
|
|
rows, err := db.conn.Query(`SELECT jira_key FROM cards WHERE jira_key != ''`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
out := map[string]bool{}
|
|
for rows.Next() {
|
|
var k string
|
|
if err := rows.Scan(&k); err != nil {
|
|
return nil, err
|
|
}
|
|
out[k] = true
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
// listColumnsByName returns columns keyed by name for status-map reverse
|
|
// lookup during Jira import.
|
|
func (db *DB) listColumnsByName() (map[string]Column, error) {
|
|
cols, err := db.ListColumns()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out := make(map[string]Column, len(cols))
|
|
for _, c := range cols {
|
|
out[c.Name] = c
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// findCardByJiraKey returns the id of the card linked to jiraKey, or "" if
|
|
// no card carries that link. The lookup ignores soft-deleted cards.
|
|
func (db *DB) findCardByJiraKey(jiraKey string) (string, error) {
|
|
var id string
|
|
err := db.conn.QueryRow(
|
|
`SELECT id FROM cards WHERE jira_key = ? AND deleted_at IS NULL LIMIT 1`,
|
|
jiraKey,
|
|
).Scan(&id)
|
|
if err == sql.ErrNoRows {
|
|
return "", nil
|
|
}
|
|
return id, err
|
|
}
|
|
|
|
// updateCardJiraSync updates the per-card sync-state columns. statusName is
|
|
// preserved when empty (so we do not blank it on events that do not change
|
|
// the Jira status, like comments).
|
|
func (db *DB) updateCardJiraSync(cardID, statusName, syncAt, errMsg string) error {
|
|
if statusName != "" {
|
|
_, err := db.conn.Exec(
|
|
`UPDATE cards SET jira_last_status=?, jira_last_sync_at=?, jira_last_error=? WHERE id=?`,
|
|
statusName, syncAt, errMsg, cardID,
|
|
)
|
|
return err
|
|
}
|
|
_, err := db.conn.Exec(
|
|
`UPDATE cards SET jira_last_sync_at=?, jira_last_error=? WHERE id=?`,
|
|
syncAt, errMsg, cardID,
|
|
)
|
|
return err
|
|
}
|
|
|
|
// CardJiraSyncState is the row returned by /api/cards/{id}/jira-sync.
|
|
type CardJiraSyncState struct {
|
|
CardID string `json:"card_id"`
|
|
JiraKey string `json:"jira_key"`
|
|
LastStatus string `json:"last_status"`
|
|
LastSyncAt string `json:"last_sync_at"`
|
|
LastError string `json:"last_error"`
|
|
Inflight bool `json:"inflight"`
|
|
IssueURL string `json:"issue_url,omitempty"`
|
|
}
|
|
|
|
// readCardJiraSync loads the persisted sync state for a card. Callers add the
|
|
// inflight flag + issue url separately because those depend on runtime state
|
|
// (dispatcher map) and module config (base url).
|
|
func (db *DB) readCardJiraSync(cardID string) (CardJiraSyncState, error) {
|
|
var s CardJiraSyncState
|
|
s.CardID = cardID
|
|
var jiraKey, lastStatus, lastSyncAt, lastError sql.NullString
|
|
err := db.conn.QueryRow(
|
|
`SELECT jira_key, jira_last_status, jira_last_sync_at, jira_last_error
|
|
FROM cards WHERE id = ?`, cardID,
|
|
).Scan(&jiraKey, &lastStatus, &lastSyncAt, &lastError)
|
|
if err != nil {
|
|
return s, err
|
|
}
|
|
if jiraKey.Valid {
|
|
s.JiraKey = jiraKey.String
|
|
}
|
|
if lastStatus.Valid {
|
|
s.LastStatus = lastStatus.String
|
|
}
|
|
if lastSyncAt.Valid {
|
|
s.LastSyncAt = lastSyncAt.String
|
|
}
|
|
if lastError.Valid {
|
|
s.LastError = lastError.String
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
func (db *DB) getCardForJira(cardID string) (*cardForJira, error) {
|
|
var c cardForJira
|
|
var assignee, deadline, jiraKey sql.NullString
|
|
var tagsJSON string
|
|
err := db.conn.QueryRow(
|
|
`SELECT c.id, c.title, c.description, c.requester, c.column_id, c.assignee_id,
|
|
c.deadline, c.tags, c.jira_key, c.created_at, col.name
|
|
FROM cards c JOIN columns col ON col.id = c.column_id WHERE c.id = ?`,
|
|
cardID,
|
|
).Scan(&c.ID, &c.Title, &c.Description, &c.Requester, &c.ColumnID, &assignee,
|
|
&deadline, &tagsJSON, &jiraKey, &c.CreatedAt, &c.ColumnName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if assignee.Valid {
|
|
c.AssigneeID = assignee.String
|
|
}
|
|
if deadline.Valid {
|
|
c.Deadline = deadline.String
|
|
}
|
|
if jiraKey.Valid {
|
|
c.JiraKey = jiraKey.String
|
|
}
|
|
_ = json.Unmarshal([]byte(tagsJSON), &c.Tags)
|
|
return &c, nil
|
|
}
|
|
|
|
type cardForJira struct {
|
|
ID string
|
|
Title string
|
|
Description string
|
|
Requester string
|
|
ColumnID string
|
|
ColumnName string
|
|
AssigneeID string
|
|
Deadline string
|
|
Tags []string
|
|
JiraKey string
|
|
CreatedAt string
|
|
}
|
|
|
|
func (c *cardForJira) hasTag(name string) bool {
|
|
name = strings.ToLower(name)
|
|
for _, t := range c.Tags {
|
|
if strings.ToLower(t) == name {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func splitCSV(s string) []string {
|
|
if s == "" {
|
|
return nil
|
|
}
|
|
parts := strings.Split(s, ",")
|
|
out := make([]string, 0, len(parts))
|
|
for _, p := range parts {
|
|
p = strings.TrimSpace(p)
|
|
if p != "" {
|
|
out = append(out, p)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func boolInt(b bool) int {
|
|
if b {
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// =============================================================================
|
|
// Dispatcher
|
|
// =============================================================================
|
|
|
|
const (
|
|
moduleRetries = 3
|
|
moduleRetryDelay1 = 1 * time.Second
|
|
moduleRetryDelay2 = 5 * time.Second
|
|
moduleRetryDelay3 = 30 * time.Second
|
|
moduleHTTPTimeout = 15 * time.Second
|
|
moduleOptOutTag = "nojira"
|
|
moduleDispatchQueue = 256
|
|
)
|
|
|
|
// Dispatcher fans events from the EventHub into per-module handlers.
|
|
//
|
|
// Lifecycle:
|
|
// - Start() spawns a single subscriber goroutine on the hub plus a
|
|
// bounded worker pool.
|
|
// - Stop() cancels the context and waits for in-flight requests to drain.
|
|
//
|
|
// Handlers receive a decrypted Module copy + the Event; they own the HTTP
|
|
// call to the target system. The dispatcher logs every attempt.
|
|
type Dispatcher struct {
|
|
db *DB
|
|
hub *EventHub
|
|
handlers map[string]Handler
|
|
queue chan dispatchTask
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
enabled bool
|
|
// inflight tracks cards whose sync is currently being attempted. Used by
|
|
// /api/cards/{id}/jira-sync to render the "yellow" state in the UI.
|
|
inflight sync.Map // map[cardID]struct{}
|
|
}
|
|
|
|
// IsInflight reports whether a sync attempt is currently being executed for
|
|
// the given card. Callers can use it to render a "syncing" indicator.
|
|
func (d *Dispatcher) IsInflight(cardID string) bool {
|
|
if d == nil {
|
|
return false
|
|
}
|
|
_, ok := d.inflight.Load(cardID)
|
|
return ok
|
|
}
|
|
|
|
type dispatchTask struct {
|
|
module Module
|
|
event Event
|
|
}
|
|
|
|
type Handler interface {
|
|
Handle(ctx context.Context, db *DB, m Module, ev Event) (status int, err error)
|
|
TestConnection(ctx context.Context, m Module) (status int, err error)
|
|
}
|
|
|
|
func NewDispatcher(db *DB, hub *EventHub) *Dispatcher {
|
|
_, hasKey := moduleKey()
|
|
return &Dispatcher{
|
|
db: db,
|
|
hub: hub,
|
|
handlers: map[string]Handler{"jira": &jiraHandler{}},
|
|
queue: make(chan dispatchTask, moduleDispatchQueue),
|
|
enabled: hasKey,
|
|
}
|
|
}
|
|
|
|
func (d *Dispatcher) Start() {
|
|
if !d.enabled {
|
|
log.Printf("module dispatcher disabled (%s not set)", moduleKeyEnv)
|
|
return
|
|
}
|
|
d.ctx, d.cancel = context.WithCancel(context.Background())
|
|
// Subscribe under a synthetic user so the hub treats us as a normal
|
|
// recipient of broadcast events. Private user-targeted events are
|
|
// uninteresting for outbound sync.
|
|
go d.run()
|
|
for i := 0; i < 4; i++ {
|
|
go d.worker(i)
|
|
}
|
|
log.Printf("module dispatcher started")
|
|
}
|
|
|
|
func (d *Dispatcher) Stop() {
|
|
if d.cancel != nil {
|
|
d.cancel()
|
|
}
|
|
}
|
|
|
|
func (d *Dispatcher) run() {
|
|
ch := d.hub.SubscribeUser("__module_dispatcher__")
|
|
defer d.hub.UnsubscribeUser("__module_dispatcher__", ch)
|
|
for {
|
|
select {
|
|
case <-d.ctx.Done():
|
|
return
|
|
case ev, ok := <-ch:
|
|
if !ok {
|
|
return
|
|
}
|
|
d.fanout(ev)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Dispatcher) fanout(ev Event) {
|
|
mods, err := d.db.listModulesEnabled()
|
|
if err != nil {
|
|
log.Printf("dispatcher: listModulesEnabled: %v", err)
|
|
return
|
|
}
|
|
for _, m := range mods {
|
|
if !filterMatches(m.EventFilter, ev.Type) {
|
|
continue
|
|
}
|
|
if !cutoffOK(d.db, m, ev) {
|
|
continue
|
|
}
|
|
if ev.CardID != "" {
|
|
c, err := d.db.getCardForJira(ev.CardID)
|
|
if err == nil && c.hasTag(moduleOptOutTag) {
|
|
continue
|
|
}
|
|
}
|
|
select {
|
|
case d.queue <- dispatchTask{module: m, event: ev}:
|
|
default:
|
|
log.Printf("dispatcher: queue full, dropping event %s for module %s", ev.Type, m.ID)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Dispatcher) worker(id int) {
|
|
for {
|
|
select {
|
|
case <-d.ctx.Done():
|
|
return
|
|
case task, ok := <-d.queue:
|
|
if !ok {
|
|
return
|
|
}
|
|
d.dispatch(task)
|
|
}
|
|
}
|
|
}
|
|
|
|
// dispatch runs the handler with up to moduleRetries attempts using a
|
|
// fixed back-off schedule (1s, 5s, 30s). Each attempt creates a log row;
|
|
// the final outcome is the one returned to the caller.
|
|
func (d *Dispatcher) dispatch(t dispatchTask) {
|
|
h, ok := d.handlers[t.module.Kind]
|
|
if !ok {
|
|
_ = d.db.appendModuleLog(ModuleLog{
|
|
ModuleID: t.module.ID, EventType: t.event.Type, CardID: t.event.CardID,
|
|
Error: "unknown module kind: " + t.module.Kind,
|
|
})
|
|
return
|
|
}
|
|
if t.event.CardID != "" {
|
|
d.inflight.Store(t.event.CardID, struct{}{})
|
|
defer d.inflight.Delete(t.event.CardID)
|
|
}
|
|
delays := []time.Duration{0, moduleRetryDelay1, moduleRetryDelay2, moduleRetryDelay3}
|
|
var lastErr error
|
|
var lastStatus int
|
|
for attempt := 0; attempt < moduleRetries; attempt++ {
|
|
if delays[attempt] > 0 {
|
|
select {
|
|
case <-d.ctx.Done():
|
|
return
|
|
case <-time.After(delays[attempt]):
|
|
}
|
|
}
|
|
ctx, cancel := context.WithTimeout(d.ctx, moduleHTTPTimeout)
|
|
start := time.Now()
|
|
status, err := h.Handle(ctx, d.db, t.module, t.event)
|
|
cancel()
|
|
ml := ModuleLog{
|
|
ModuleID: t.module.ID, EventType: t.event.Type, CardID: t.event.CardID,
|
|
Status: status, DurationMs: int(time.Since(start).Milliseconds()),
|
|
}
|
|
if err != nil {
|
|
ml.Error = err.Error()
|
|
}
|
|
_ = d.db.appendModuleLog(ml)
|
|
lastErr = err
|
|
lastStatus = status
|
|
if err == nil {
|
|
d.recordCardSyncSuccess(t.module, t.event)
|
|
return
|
|
}
|
|
// 4xx client errors are not worth retrying.
|
|
if status >= 400 && status < 500 {
|
|
break
|
|
}
|
|
}
|
|
// All retries exhausted (or stopped early on 4xx). Persist the failure
|
|
// so the UI can render the card as out-of-sync without polling Jira.
|
|
d.recordCardSyncFailure(t.event, lastErr, lastStatus)
|
|
}
|
|
|
|
// recordCardSyncSuccess persists the post-sync state to cards.jira_last_*
|
|
// columns. The "status" stored mirrors what we asked Jira to land at via the
|
|
// status_map; comment events leave the status field unchanged.
|
|
func (d *Dispatcher) recordCardSyncSuccess(m Module, ev Event) {
|
|
if ev.CardID == "" {
|
|
return
|
|
}
|
|
now := time.Now().UTC().Format(time.RFC3339)
|
|
var statusName string
|
|
if m.Kind == "jira" && ev.Type != "message.created" {
|
|
cfg, err := parseJiraConfig(m)
|
|
if err == nil {
|
|
card, cerr := d.db.getCardForJira(ev.CardID)
|
|
if cerr == nil {
|
|
statusName = cfg.StatusMap[card.ColumnName]
|
|
}
|
|
}
|
|
}
|
|
if err := d.db.updateCardJiraSync(ev.CardID, statusName, now, ""); err != nil {
|
|
log.Printf("dispatcher: updateCardJiraSync(success) %s: %v", ev.CardID, err)
|
|
}
|
|
}
|
|
|
|
func (d *Dispatcher) recordCardSyncFailure(ev Event, err error, status int) {
|
|
if ev.CardID == "" {
|
|
return
|
|
}
|
|
now := time.Now().UTC().Format(time.RFC3339)
|
|
msg := "sync failed"
|
|
if err != nil {
|
|
msg = err.Error()
|
|
}
|
|
if status > 0 {
|
|
msg = fmt.Sprintf("(http %d) %s", status, msg)
|
|
}
|
|
if uerr := d.db.updateCardJiraSync(ev.CardID, "", now, msg); uerr != nil {
|
|
log.Printf("dispatcher: updateCardJiraSync(failure) %s: %v", ev.CardID, uerr)
|
|
}
|
|
}
|
|
|
|
// =============================================================================
|
|
// Helpers
|
|
// =============================================================================
|
|
|
|
func filterMatches(filter []string, eventType string) bool {
|
|
for _, f := range filter {
|
|
if f == eventType || f == "*" {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// cutoffOK applies the "module only sees events posterior to its creation"
|
|
// rule. Cards that were already linked to Jira (jira_key != "") are always
|
|
// eligible regardless of timestamps.
|
|
func cutoffOK(db *DB, m Module, ev Event) bool {
|
|
if ev.CardID == "" {
|
|
return true
|
|
}
|
|
c, err := db.getCardForJira(ev.CardID)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
if c.JiraKey != "" {
|
|
return true
|
|
}
|
|
return c.CreatedAt >= m.CreatedAt
|
|
}
|
|
|
|
// =============================================================================
|
|
// Jira handler
|
|
// =============================================================================
|
|
|
|
type jiraHandler struct{}
|
|
|
|
type jiraConfig struct {
|
|
BaseURL string `json:"base_url"`
|
|
Email string `json:"email"`
|
|
APIToken string `json:"api_token"`
|
|
ProjectKey string `json:"project_key"`
|
|
BoardID int `json:"board_id"`
|
|
IssueType string `json:"issue_type"` // Jira issuetype name applied on create
|
|
StatusMap map[string]string `json:"status_map"` // kanban_column_name -> Jira status name
|
|
LabelsMap map[string][]string `json:"labels_map,omitempty"` // kanban_column_name -> Jira labels (replaces every sync)
|
|
AssigneeMap map[string]string `json:"assignee_map,omitempty"` // kanban_user_id -> Jira accountId
|
|
}
|
|
|
|
func parseJiraConfig(m Module) (jiraConfig, error) {
|
|
b, err := json.Marshal(m.Config)
|
|
if err != nil {
|
|
return jiraConfig{}, err
|
|
}
|
|
var c jiraConfig
|
|
if err := json.Unmarshal(b, &c); err != nil {
|
|
return jiraConfig{}, err
|
|
}
|
|
c.BaseURL = strings.TrimRight(c.BaseURL, "/")
|
|
if c.BaseURL == "" {
|
|
return c, fmt.Errorf("base_url required")
|
|
}
|
|
if c.IssueType == "" {
|
|
c.IssueType = "Tarea Técnica"
|
|
}
|
|
return c, nil
|
|
}
|
|
|
|
func (h *jiraHandler) jiraRequest(ctx context.Context, c jiraConfig, method, path string, body interface{}) (int, []byte, error) {
|
|
var rdr io.Reader
|
|
if body != nil {
|
|
b, err := json.Marshal(body)
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
rdr = bytes.NewReader(b)
|
|
}
|
|
req, err := http.NewRequestWithContext(ctx, method, c.BaseURL+path, rdr)
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
req.Header.Set("Accept", "application/json")
|
|
if body != nil {
|
|
req.Header.Set("Content-Type", "application/json")
|
|
}
|
|
if c.Email != "" && c.APIToken != "" {
|
|
basic := base64.StdEncoding.EncodeToString([]byte(c.Email + ":" + c.APIToken))
|
|
req.Header.Set("Authorization", "Basic "+basic)
|
|
}
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
|
|
if resp.StatusCode >= 400 {
|
|
return resp.StatusCode, respBody, fmt.Errorf("jira %s %s: %d %s", method, path, resp.StatusCode, truncate(respBody, 240))
|
|
}
|
|
return resp.StatusCode, respBody, nil
|
|
}
|
|
|
|
func truncate(b []byte, n int) string {
|
|
if len(b) <= n {
|
|
return string(b)
|
|
}
|
|
return string(b[:n]) + "…"
|
|
}
|
|
|
|
func (h *jiraHandler) TestConnection(ctx context.Context, m Module) (int, error) {
|
|
c, err := parseJiraConfig(m)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
status, _, err := h.jiraRequest(ctx, c, http.MethodGet, "/rest/api/3/myself", nil)
|
|
if err != nil {
|
|
return status, err
|
|
}
|
|
// If a board scope is configured, verify the board exists AND lives in
|
|
// the declared project. Refuse silently-mismatched configurations so a
|
|
// typo in project_key cannot create issues outside the intended board.
|
|
if c.BoardID > 0 {
|
|
bStatus, body, err := h.jiraRequest(ctx, c, http.MethodGet,
|
|
fmt.Sprintf("/rest/agile/1.0/board/%d", c.BoardID), nil)
|
|
if err != nil {
|
|
return bStatus, fmt.Errorf("board %d lookup: %w", c.BoardID, err)
|
|
}
|
|
var board struct {
|
|
Type string `json:"type"`
|
|
Location struct {
|
|
ProjectKey string `json:"projectKey"`
|
|
} `json:"location"`
|
|
}
|
|
if err := json.Unmarshal(body, &board); err != nil {
|
|
return bStatus, fmt.Errorf("decode board %d: %w", c.BoardID, err)
|
|
}
|
|
if c.ProjectKey != "" && !strings.EqualFold(board.Location.ProjectKey, c.ProjectKey) {
|
|
return 0, fmt.Errorf("board %d belongs to project %q, config declares %q",
|
|
c.BoardID, board.Location.ProjectKey, c.ProjectKey)
|
|
}
|
|
}
|
|
return status, nil
|
|
}
|
|
|
|
func (h *jiraHandler) Handle(ctx context.Context, db *DB, m Module, ev Event) (int, error) {
|
|
c, err := parseJiraConfig(m)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
switch ev.Type {
|
|
case "card.created":
|
|
return h.create(ctx, db, c, ev)
|
|
case "card.updated", "board.invalidated":
|
|
return h.update(ctx, db, c, ev)
|
|
case "card.moved":
|
|
return h.transition(ctx, db, c, ev)
|
|
case "message.created":
|
|
return h.comment(ctx, db, c, ev)
|
|
default:
|
|
// Silently ignore unhandled event types so the dispatcher does not
|
|
// retry on irrelevant traffic.
|
|
return 200, nil
|
|
}
|
|
}
|
|
|
|
func (h *jiraHandler) create(ctx context.Context, db *DB, c jiraConfig, ev Event) (int, error) {
|
|
if ev.CardID == "" {
|
|
return 0, nil
|
|
}
|
|
card, err := db.getCardForJira(ev.CardID)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if card.JiraKey != "" {
|
|
// Idempotent: card already linked to Jira; treat as update.
|
|
return h.update(ctx, db, c, ev)
|
|
}
|
|
if c.ProjectKey == "" {
|
|
return 0, fmt.Errorf("project_key required for create (configure module before pushing)")
|
|
}
|
|
fields := map[string]interface{}{
|
|
"project": map[string]string{"key": c.ProjectKey},
|
|
"summary": card.Title,
|
|
"description": adfText(card.Description),
|
|
"issuetype": map[string]string{"name": c.IssueType},
|
|
}
|
|
if labels := c.LabelsMap[card.ColumnName]; len(labels) > 0 {
|
|
fields["labels"] = labels
|
|
}
|
|
if acct := resolveJiraAssignee(c, card); acct != "" {
|
|
fields["assignee"] = map[string]string{"accountId": acct}
|
|
}
|
|
body := map[string]interface{}{"fields": fields}
|
|
status, resp, err := h.jiraRequest(ctx, c, http.MethodPost, "/rest/api/3/issue", body)
|
|
if err != nil {
|
|
return status, err
|
|
}
|
|
var parsed struct {
|
|
Key string `json:"key"`
|
|
}
|
|
_ = json.Unmarshal(resp, &parsed)
|
|
if parsed.Key == "" {
|
|
return status, fmt.Errorf("jira create returned empty key")
|
|
}
|
|
if err := db.setCardJiraKey(card.ID, parsed.Key); err != nil {
|
|
return status, fmt.Errorf("link jira key: %w", err)
|
|
}
|
|
// Jira places new issues in the workflow's initial status (typically
|
|
// CREADO / To Do for DATA). Drive a transition immediately so the issue
|
|
// lands in the column that mirrors where the card is in kanban.
|
|
if _, ok := c.StatusMap[card.ColumnName]; ok {
|
|
if _, err := h.transitionToStatus(ctx, c, parsed.Key, card.ColumnName); err != nil {
|
|
return status, fmt.Errorf("created %s but initial transition failed: %w", parsed.Key, err)
|
|
}
|
|
}
|
|
return status, nil
|
|
}
|
|
|
|
func (h *jiraHandler) update(ctx context.Context, db *DB, c jiraConfig, ev Event) (int, error) {
|
|
if ev.CardID == "" {
|
|
return 0, nil
|
|
}
|
|
card, err := db.getCardForJira(ev.CardID)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if card.JiraKey == "" {
|
|
// Card not yet linked — bootstrap by creating it.
|
|
return h.create(ctx, db, c, ev)
|
|
}
|
|
fields := map[string]interface{}{
|
|
"summary": card.Title,
|
|
"description": adfText(card.Description),
|
|
}
|
|
// Labels are derived from the current kanban column. We always send them
|
|
// (even an empty array) so a card that leaves a labelled column gets its
|
|
// label removed from Jira — PUT fields.labels REPLACES the whole array.
|
|
labels := c.LabelsMap[card.ColumnName]
|
|
if labels == nil {
|
|
labels = []string{}
|
|
}
|
|
fields["labels"] = labels
|
|
if acct := resolveJiraAssignee(c, card); acct != "" {
|
|
fields["assignee"] = map[string]string{"accountId": acct}
|
|
}
|
|
body := map[string]interface{}{"fields": fields}
|
|
status, _, err := h.jiraRequest(ctx, c, http.MethodPut, "/rest/api/3/issue/"+card.JiraKey, body)
|
|
return status, err
|
|
}
|
|
|
|
// resolveJiraAssignee maps the kanban card's assignee_id to a Jira accountId
|
|
// via the module's assignee_map. Returns "" when the card has no assignee or
|
|
// the assignee is not mapped, signalling to the caller to omit the field
|
|
// (avoids accidentally CLEARING an existing Jira assignee on every sync).
|
|
func resolveJiraAssignee(c jiraConfig, card *cardForJira) string {
|
|
if card == nil || card.AssigneeID == "" {
|
|
return ""
|
|
}
|
|
return c.AssigneeMap[card.AssigneeID]
|
|
}
|
|
|
|
// transition uses the configured status_map to translate the kanban column
|
|
// to a Jira transition name. Kanban remains the source of truth even if
|
|
// Jira's current state differs.
|
|
func (h *jiraHandler) transition(ctx context.Context, db *DB, c jiraConfig, ev Event) (int, error) {
|
|
if ev.CardID == "" {
|
|
return 0, nil
|
|
}
|
|
card, err := db.getCardForJira(ev.CardID)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if card.JiraKey == "" {
|
|
return h.create(ctx, db, c, ev)
|
|
}
|
|
if _, ok := c.StatusMap[card.ColumnName]; !ok {
|
|
return 0, fmt.Errorf("no status_map entry for column %q", card.ColumnName)
|
|
}
|
|
return h.transitionToStatus(ctx, c, card.JiraKey, card.ColumnName)
|
|
}
|
|
|
|
// transitionToStatus drives a Jira issue to the status mapped from the given
|
|
// kanban column and refreshes labels accordingly. Used by transition() on
|
|
// card.moved events and by create() right after issue creation so new issues
|
|
// do not stall at the workflow's default initial status.
|
|
func (h *jiraHandler) transitionToStatus(ctx context.Context, c jiraConfig, jiraKey, columnName string) (int, error) {
|
|
target := c.StatusMap[columnName]
|
|
if target == "" {
|
|
return 0, fmt.Errorf("no status_map entry for column %q", columnName)
|
|
}
|
|
status, body, err := h.jiraRequest(ctx, c, http.MethodGet, "/rest/api/3/issue/"+jiraKey+"/transitions", nil)
|
|
if err != nil {
|
|
return status, err
|
|
}
|
|
var available struct {
|
|
Transitions []struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
To struct {
|
|
Name string `json:"name"`
|
|
} `json:"to"`
|
|
} `json:"transitions"`
|
|
}
|
|
if err := json.Unmarshal(body, &available); err != nil {
|
|
return status, fmt.Errorf("decode transitions: %w", err)
|
|
}
|
|
var tID string
|
|
for _, t := range available.Transitions {
|
|
if strings.EqualFold(t.To.Name, target) || strings.EqualFold(t.Name, target) {
|
|
tID = t.ID
|
|
break
|
|
}
|
|
}
|
|
if tID == "" {
|
|
return 0, fmt.Errorf("transition %q not available for %s", target, jiraKey)
|
|
}
|
|
req := map[string]interface{}{"transition": map[string]string{"id": tID}}
|
|
status, _, err = h.jiraRequest(ctx, c, http.MethodPost, "/rest/api/3/issue/"+jiraKey+"/transitions", req)
|
|
if err != nil {
|
|
return status, err
|
|
}
|
|
// Refresh labels to match the new column. Replaces the labels array; an
|
|
// empty list strips any stale labels from the previous column.
|
|
labels := c.LabelsMap[columnName]
|
|
if labels == nil {
|
|
labels = []string{}
|
|
}
|
|
lbody := map[string]interface{}{"fields": map[string]interface{}{"labels": labels}}
|
|
lStatus, _, lErr := h.jiraRequest(ctx, c, http.MethodPut, "/rest/api/3/issue/"+jiraKey, lbody)
|
|
if lErr != nil {
|
|
return lStatus, fmt.Errorf("transition ok but labels sync failed: %w", lErr)
|
|
}
|
|
return status, nil
|
|
}
|
|
|
|
func (h *jiraHandler) comment(ctx context.Context, db *DB, c jiraConfig, ev Event) (int, error) {
|
|
if ev.CardID == "" {
|
|
return 0, nil
|
|
}
|
|
card, err := db.getCardForJira(ev.CardID)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if card.JiraKey == "" {
|
|
// Cannot comment on a card not yet synced; skip.
|
|
return 0, nil
|
|
}
|
|
var payload struct {
|
|
Body string `json:"body"`
|
|
}
|
|
_ = json.Unmarshal(ev.Payload, &payload)
|
|
if payload.Body == "" {
|
|
return 0, nil
|
|
}
|
|
body := map[string]interface{}{"body": adfText(payload.Body)}
|
|
status, _, err := h.jiraRequest(ctx, c, http.MethodPost, "/rest/api/3/issue/"+card.JiraKey+"/comment", body)
|
|
return status, err
|
|
}
|
|
|
|
// adfText wraps a plain string into the minimal Atlassian Document Format
|
|
// fragment Jira Cloud requires for description / comment bodies.
|
|
func adfText(s string) map[string]interface{} {
|
|
return map[string]interface{}{
|
|
"type": "doc",
|
|
"version": 1,
|
|
"content": []map[string]interface{}{{
|
|
"type": "paragraph",
|
|
"content": []map[string]interface{}{{
|
|
"type": "text",
|
|
"text": s,
|
|
}},
|
|
}},
|
|
}
|
|
}
|