package main import ( "bufio" "bytes" "context" "encoding/json" "io" "log" "os" "os/exec" "strconv" "strings" "time" "fn-registry/functions/infra" ) // shipLogs picks the right log source for the platform and ships to Loki: // systemd journald on Linux servers, logcat on Android/Termux. If neither is // available it logs once and returns, leaving metrics shipping unaffected. // // Binaries are located with os.Stat (not exec.LookPath) and run by absolute // path: on Android the faccessat2 syscall that LookPath uses is blocked by // seccomp and crashes the process with SIGSYS. func shipLogs(ctx context.Context, cfg Config) { // Android/Termux: a shell helper writes `logcat` output to cfg.LogFile and // we tail it (no exec, which seccomp would kill via pidfd_open SIGSYS). if cfg.LogFile != "" { shipFileTail(ctx, cfg, cfg.LogFile, "logcat") return } // Linux servers: read the systemd journal directly. if p := findBin("/usr/bin/journalctl", "/bin/journalctl"); p != "" { shipJournald(ctx, cfg, p) return } log.Print("logs: no log source (no log_file nor journalctl), log shipping disabled") } // findBin returns the first candidate path that exists, or "". func findBin(candidates ...string) string { for _, c := range candidates { if _, err := os.Stat(c); err == nil { return c } } return "" } // journalEntry is the subset of fields we read from `journalctl -o json`. type journalEntry struct { Message json.RawMessage `json:"MESSAGE"` Unit string `json:"_SYSTEMD_UNIT"` Comm string `json:"_COMM"` Realtime string `json:"__REALTIME_TIMESTAMP"` // microseconds since epoch, as a string Priority string `json:"PRIORITY"` } // message decodes MESSAGE, which journald serialises either as a JSON string // (normal text) or as an array of byte values (binary/non-UTF8 logs). func (e journalEntry) message() string { if len(e.Message) == 0 { return "" } if e.Message[0] == '"' { var s string if json.Unmarshal(e.Message, &s) == nil { return s } } var arr []int if json.Unmarshal(e.Message, &arr) == nil { b := make([]byte, len(arr)) for i, v := range arr { b[i] = byte(v) } return string(b) } return "" } // tsNs returns the entry timestamp in nanoseconds, falling back to now. func (e journalEntry) tsNs(now int64) int64 { us, err := strconv.ParseInt(e.Realtime, 10, 64) if err != nil || us == 0 { return now } return us * 1000 } // stream is a key identifying a Loki stream (one set of labels). type logLine struct { ts int64 unit string line string } // shipJournald follows the systemd journal and pushes new lines to Loki in // batches, grouped into one stream per unit. It returns when ctx is cancelled. // If journalctl is not available (e.g. on Android/Termux) it logs once and exits // without error, leaving metrics shipping unaffected. func shipJournald(ctx context.Context, cfg Config, binPath string) { cmd := exec.CommandContext(ctx, binPath, "-f", "-o", "json", "-n", "0", "--no-pager") stdout, err := cmd.StdoutPipe() if err != nil { log.Printf("logs: cannot pipe journalctl: %v", err) return } if err := cmd.Start(); err != nil { log.Printf("logs: journalctl unavailable, log shipping disabled: %v", err) return } log.Print("logs: journald shipping started") lines := make(chan logLine, 2000) go func() { scanner := bufio.NewScanner(stdout) scanner.Buffer(make([]byte, 1024*1024), 4*1024*1024) for scanner.Scan() { var e journalEntry if json.Unmarshal(scanner.Bytes(), &e) != nil { continue } msg := e.message() if msg == "" { continue } unit := e.Unit if unit == "" { unit = e.Comm } if unit == "" { unit = "kernel" } select { case lines <- logLine{ts: e.tsNs(time.Now().UnixNano()), unit: unit, line: msg}: default: // buffer full: drop rather than block the reader } } }() ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() var buf []logLine flush := func() { if len(buf) == 0 { return } byUnit := map[string][]logLine{} for _, it := range buf { byUnit[it.unit] = append(byUnit[it.unit], it) } for unit, items := range byUnit { ts := make([]int64, len(items)) ln := make([]string, len(items)) for i, it := range items { ts[i] = it.ts ln[i] = it.line } labels := map[string]string{"instance": cfg.Node, "job": "journald", "unit": unit} if err := infra.PushLokiStream(cfg.LokiURL, cfg.User, cfg.Pass, labels, ts, ln); err != nil { log.Printf("logs: push error (unit=%s, %d lines): %v", unit, len(items), err) } } buf = buf[:0] } for { select { case it := <-lines: buf = append(buf, it) if len(buf) >= 500 { flush() } case <-ticker.C: flush() case <-ctx.Done(): flush() return } } } // shipFileTail tails a growing log file (written by an external shell helper, // e.g. `logcat -v epoch` on Android/Termux) and pushes new lines to Loki under // one stream (job=). It does NO exec — only file reads — so it is safe on // Android where exec from Go is blocked by seccomp. Handles truncation/rotation // by detecting a shrinking file and restarting from offset 0. func shipFileTail(ctx context.Context, cfg Config, path, job string) { log.Printf("logs: tailing %s for Loki (job=%s)", path, job) var offset int64 if fi, err := os.Stat(path); err == nil { offset = fi.Size() // skip pre-existing history on first start } labels := map[string]string{"instance": cfg.Node, "job": job} ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: f, err := os.Open(path) if err != nil { continue } fi, err := f.Stat() if err != nil { f.Close() continue } if fi.Size() < offset { offset = 0 // file was truncated or rotated } if fi.Size() == offset { f.Close() continue } if _, err := f.Seek(offset, io.SeekStart); err != nil { f.Close() continue } data, err := io.ReadAll(f) f.Close() if err != nil { continue } // Only consume up to the last complete line; keep the remainder for // the next tick so we never ship a half-written line. lastNL := bytes.LastIndexByte(data, '\n') if lastNL < 0 { continue } offset += int64(lastNL + 1) var ts []int64 var ln []string now := time.Now().UnixNano() for _, raw := range strings.Split(string(data[:lastNL]), "\n") { raw = strings.TrimSpace(raw) if raw == "" || strings.HasPrefix(raw, "---------") { continue } t, msg := parseLogcatEpoch(raw, now) ts = append(ts, t) ln = append(ln, msg) } if len(ln) == 0 { continue } if err := infra.PushLokiStream(cfg.LokiURL, cfg.User, cfg.Pass, labels, ts, ln); err != nil { log.Printf("logs: file push error (%d lines): %v", len(ln), err) } } } } // parseLogcatEpoch splits a `-v epoch` logcat line into a nanosecond timestamp // and the remaining text. Lines look like: "1609459200.123 1234 1235 I Tag: msg". // On any parse failure it returns the fallback timestamp and the raw line. func parseLogcatEpoch(raw string, fallback int64) (int64, string) { sp := strings.IndexByte(raw, ' ') if sp <= 0 { return fallback, raw } secs, err := strconv.ParseFloat(raw[:sp], 64) if err != nil { return fallback, raw } rest := strings.TrimSpace(raw[sp:]) return int64(secs * 1e9), rest }