From 2176e9d4428d1c4c84dc4a0ea0de654c6fc41f5f Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Sun, 7 Jun 2026 13:22:00 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20shipping=20de=20logs=20journald=20a=20L?= =?UTF-8?q?oki=20(config=20loki=5Furl=20+=20shipper=20journalctl=E2=86=92P?= =?UTF-8?q?ushLokiStream)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.go | 10 ++-- logs.go | 148 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ main.go | 25 ++++++--- 3 files changed, 174 insertions(+), 9 deletions(-) create mode 100644 logs.go diff --git a/config.go b/config.go index 9529129..5c4f36e 100644 --- a/config.go +++ b/config.go @@ -11,10 +11,11 @@ import ( // systemd drop-ins and for deploying the same binary to many nodes. type Config struct { Node string `json:"node"` // value of the "instance" label attached to every series - HubURL string `json:"hub_url"` // full ingest URL, e.g. https://metrics-…/api/v1/import/prometheus - User string `json:"user"` // basic-auth user (empty disables auth) + HubURL string `json:"hub_url"` // full metrics ingest URL, e.g. https://metrics-…/api/v1/import/prometheus + LokiURL string `json:"loki_url"` // full Loki push URL, e.g. https://logs-…/loki/api/v1/push (empty disables log shipping) + User string `json:"user"` // basic-auth user, shared by metrics and logs (empty disables auth) Pass string `json:"pass"` // basic-auth password - IntervalSec int `json:"interval_sec"` // push period in seconds (default 15) + IntervalSec int `json:"interval_sec"` // metrics push period in seconds (default 15) } // defaultConfig returns the baseline configuration: the machine hostname as the @@ -44,6 +45,9 @@ func loadConfig(path string) (Config, error) { if v := os.Getenv("FLEET_HUB_URL"); v != "" { cfg.HubURL = v } + if v := os.Getenv("FLEET_LOKI_URL"); v != "" { + cfg.LokiURL = v + } if v := os.Getenv("FLEET_USER"); v != "" { cfg.User = v } diff --git a/logs.go b/logs.go new file mode 100644 index 0000000..260c7a2 --- /dev/null +++ b/logs.go @@ -0,0 +1,148 @@ +package main + +import ( + "bufio" + "context" + "encoding/json" + "log" + "os/exec" + "strconv" + "time" + + "fn-registry/functions/infra" +) + +// journalEntry is the subset of fields we read from `journalctl -o json`. +type journalEntry struct { + Message json.RawMessage `json:"MESSAGE"` + Unit string `json:"_SYSTEMD_UNIT"` + Comm string `json:"_COMM"` + Realtime string `json:"__REALTIME_TIMESTAMP"` // microseconds since epoch, as a string + Priority string `json:"PRIORITY"` +} + +// message decodes MESSAGE, which journald serialises either as a JSON string +// (normal text) or as an array of byte values (binary/non-UTF8 logs). +func (e journalEntry) message() string { + if len(e.Message) == 0 { + return "" + } + if e.Message[0] == '"' { + var s string + if json.Unmarshal(e.Message, &s) == nil { + return s + } + } + var arr []int + if json.Unmarshal(e.Message, &arr) == nil { + b := make([]byte, len(arr)) + for i, v := range arr { + b[i] = byte(v) + } + return string(b) + } + return "" +} + +// tsNs returns the entry timestamp in nanoseconds, falling back to now. +func (e journalEntry) tsNs(now int64) int64 { + us, err := strconv.ParseInt(e.Realtime, 10, 64) + if err != nil || us == 0 { + return now + } + return us * 1000 +} + +// stream is a key identifying a Loki stream (one set of labels). +type logLine struct { + ts int64 + unit string + line string +} + +// shipJournald follows the systemd journal and pushes new lines to Loki in +// batches, grouped into one stream per unit. It returns when ctx is cancelled. +// If journalctl is not available (e.g. on Android/Termux) it logs once and exits +// without error, leaving metrics shipping unaffected. +func shipJournald(ctx context.Context, cfg Config) { + cmd := exec.CommandContext(ctx, "journalctl", "-f", "-o", "json", "-n", "0", "--no-pager") + stdout, err := cmd.StdoutPipe() + if err != nil { + log.Printf("logs: cannot pipe journalctl: %v", err) + return + } + if err := cmd.Start(); err != nil { + log.Printf("logs: journalctl unavailable, log shipping disabled: %v", err) + return + } + log.Print("logs: journald shipping started") + + lines := make(chan logLine, 2000) + go func() { + scanner := bufio.NewScanner(stdout) + scanner.Buffer(make([]byte, 1024*1024), 4*1024*1024) + for scanner.Scan() { + var e journalEntry + if json.Unmarshal(scanner.Bytes(), &e) != nil { + continue + } + msg := e.message() + if msg == "" { + continue + } + unit := e.Unit + if unit == "" { + unit = e.Comm + } + if unit == "" { + unit = "kernel" + } + select { + case lines <- logLine{ts: e.tsNs(time.Now().UnixNano()), unit: unit, line: msg}: + default: // buffer full: drop rather than block the reader + } + } + }() + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + var buf []logLine + + flush := func() { + if len(buf) == 0 { + return + } + byUnit := map[string][]logLine{} + for _, it := range buf { + byUnit[it.unit] = append(byUnit[it.unit], it) + } + for unit, items := range byUnit { + ts := make([]int64, len(items)) + ln := make([]string, len(items)) + for i, it := range items { + ts[i] = it.ts + ln[i] = it.line + } + labels := map[string]string{"instance": cfg.Node, "job": "journald", "unit": unit} + if err := infra.PushLokiStream(cfg.LokiURL, cfg.User, cfg.Pass, labels, ts, ln); err != nil { + log.Printf("logs: push error (unit=%s, %d lines): %v", unit, len(items), err) + } + } + buf = buf[:0] + } + + for { + select { + case it := <-lines: + buf = append(buf, it) + if len(buf) >= 500 { + flush() + } + case <-ticker.C: + flush() + case <-ctx.Done(): + flush() + return + } + } +} diff --git a/main.go b/main.go index 37685bb..be24ed2 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ package main import ( + "context" "flag" "log" "os" @@ -46,14 +47,27 @@ func main() { return } - ticker := time.NewTicker(time.Duration(cfg.IntervalSec) * time.Second) - defer ticker.Stop() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() stop := make(chan os.Signal, 1) signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-stop + log.Print("shutting down") + cancel() + }() - // Push once right away so a freshly started node shows up immediately, - // then keep pushing on every tick. + // Optional: ship systemd journal logs to Loki in the background. + if cfg.LokiURL != "" { + go shipJournald(ctx, cfg) + } + + ticker := time.NewTicker(time.Duration(cfg.IntervalSec) * time.Second) + defer ticker.Stop() + + // Push metrics once right away so a freshly started node shows up + // immediately, then keep pushing on every tick. if err := pushOnce(cfg); err != nil { log.Printf("push error: %v", err) } @@ -63,8 +77,7 @@ func main() { if err := pushOnce(cfg); err != nil { log.Printf("push error: %v", err) } - case <-stop: - log.Print("shutting down") + case <-ctx.Done(): return } }