package main import ( "bufio" "context" "encoding/json" "log" "os/exec" "strconv" "time" "fn-registry/functions/infra" ) // journalEntry is the subset of fields we read from `journalctl -o json`. type journalEntry struct { Message json.RawMessage `json:"MESSAGE"` Unit string `json:"_SYSTEMD_UNIT"` Comm string `json:"_COMM"` Realtime string `json:"__REALTIME_TIMESTAMP"` // microseconds since epoch, as a string Priority string `json:"PRIORITY"` } // message decodes MESSAGE, which journald serialises either as a JSON string // (normal text) or as an array of byte values (binary/non-UTF8 logs). func (e journalEntry) message() string { if len(e.Message) == 0 { return "" } if e.Message[0] == '"' { var s string if json.Unmarshal(e.Message, &s) == nil { return s } } var arr []int if json.Unmarshal(e.Message, &arr) == nil { b := make([]byte, len(arr)) for i, v := range arr { b[i] = byte(v) } return string(b) } return "" } // tsNs returns the entry timestamp in nanoseconds, falling back to now. func (e journalEntry) tsNs(now int64) int64 { us, err := strconv.ParseInt(e.Realtime, 10, 64) if err != nil || us == 0 { return now } return us * 1000 } // stream is a key identifying a Loki stream (one set of labels). type logLine struct { ts int64 unit string line string } // shipJournald follows the systemd journal and pushes new lines to Loki in // batches, grouped into one stream per unit. It returns when ctx is cancelled. // If journalctl is not available (e.g. on Android/Termux) it logs once and exits // without error, leaving metrics shipping unaffected. func shipJournald(ctx context.Context, cfg Config) { cmd := exec.CommandContext(ctx, "journalctl", "-f", "-o", "json", "-n", "0", "--no-pager") stdout, err := cmd.StdoutPipe() if err != nil { log.Printf("logs: cannot pipe journalctl: %v", err) return } if err := cmd.Start(); err != nil { log.Printf("logs: journalctl unavailable, log shipping disabled: %v", err) return } log.Print("logs: journald shipping started") lines := make(chan logLine, 2000) go func() { scanner := bufio.NewScanner(stdout) scanner.Buffer(make([]byte, 1024*1024), 4*1024*1024) for scanner.Scan() { var e journalEntry if json.Unmarshal(scanner.Bytes(), &e) != nil { continue } msg := e.message() if msg == "" { continue } unit := e.Unit if unit == "" { unit = e.Comm } if unit == "" { unit = "kernel" } select { case lines <- logLine{ts: e.tsNs(time.Now().UnixNano()), unit: unit, line: msg}: default: // buffer full: drop rather than block the reader } } }() ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() var buf []logLine flush := func() { if len(buf) == 0 { return } byUnit := map[string][]logLine{} for _, it := range buf { byUnit[it.unit] = append(byUnit[it.unit], it) } for unit, items := range byUnit { ts := make([]int64, len(items)) ln := make([]string, len(items)) for i, it := range items { ts[i] = it.ts ln[i] = it.line } labels := map[string]string{"instance": cfg.Node, "job": "journald", "unit": unit} if err := infra.PushLokiStream(cfg.LokiURL, cfg.User, cfg.Pass, labels, ts, ln); err != nil { log.Printf("logs: push error (unit=%s, %d lines): %v", unit, len(items), err) } } buf = buf[:0] } for { select { case it := <-lines: buf = append(buf, it) if len(buf) >= 500 { flush() } case <-ticker.C: flush() case <-ctx.Done(): flush() return } } }