Files
fn_registry/functions/infra/cron_ticker.go
T
egutierrez 9c0d24d3ef feat: funciones Go — core (cron, join_by_key, validate_struct), datascience (pivot, diff_entities), infra (http, cache, cron_ticker)
Nuevas funciones Go con tests en tres dominios:
- core: parse_cron_expr, next_cron_time, join_by_key, validate_struct_fields + tipo CronSchedule
- datascience: pivot (tabla dinámica), diff_entities (comparación de entidades)
- infra: http_get_json, http_post_json, http_download_file, cache_to_sqlite, cron_ticker

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 17:11:12 +02:00

137 lines
3.3 KiB
Go

package infra
import (
"context"
"time"
)
// cronSchedule mirrors core.CronSchedule to avoid cross-package import.
// In practice, callers should use core.ParseCronExpr and pass the result here.
// The struct is duplicated to respect the registry rule of no cross-domain imports
// between function packages.
//
// CronTickerSchedule is the schedule consumed by CronTicker.
type CronTickerSchedule struct {
Minute []int
Hour []int
DayOfMonth []int
Month []int
DayOfWeek []int
}
// CronTicker creates a channel that emits the current time whenever the given
// schedule fires. It uses time.NewTimer internally, recalculating the next tick
// after each emission. The channel is closed when ctx is cancelled.
func CronTicker(schedule CronTickerSchedule, ctx context.Context) <-chan time.Time {
ch := make(chan time.Time, 1)
go func() {
defer close(ch)
for {
next := cronTickerNext(schedule, time.Now())
if next.IsZero() {
// Impossible schedule — nothing to emit.
return
}
delay := time.Until(next)
timer := time.NewTimer(delay)
select {
case <-ctx.Done():
timer.Stop()
return
case tick := <-timer.C:
select {
case ch <- tick:
default:
// Drop if consumer is not ready.
}
}
}
}()
return ch
}
// cronTickerNext finds the next time after `after` that satisfies the schedule.
// Returns zero time if no match within 366 days.
func cronTickerNext(s CronTickerSchedule, after time.Time) time.Time {
t := after.Truncate(time.Minute).Add(time.Minute)
limit := after.Add(366 * 24 * time.Hour)
for t.Before(limit) {
if !cronIntIn(int(t.Month()), s.Month) {
t = cronNextMonth(t, s.Month)
if t.IsZero() {
return time.Time{}
}
continue
}
domOK := cronIntIn(t.Day(), s.DayOfMonth)
dowOK := cronIntIn(int(t.Weekday()), s.DayOfWeek)
if !domOK || !dowOK {
t = time.Date(t.Year(), t.Month(), t.Day()+1, 0, 0, 0, 0, t.Location())
continue
}
if !cronIntIn(t.Hour(), s.Hour) {
next := cronNextHour(t, s.Hour)
if next.IsZero() {
t = time.Date(t.Year(), t.Month(), t.Day()+1, 0, 0, 0, 0, t.Location())
} else {
t = next
}
continue
}
if !cronIntIn(t.Minute(), s.Minute) {
next := cronNextMinute(t, s.Minute)
if next.IsZero() {
t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour()+1, 0, 0, 0, t.Location())
} else {
t = next
}
continue
}
return t
}
return time.Time{}
}
func cronIntIn(v int, s []int) bool {
for _, x := range s {
if x == v {
return true
}
}
return false
}
func cronNextMonth(t time.Time, months []int) time.Time {
month := int(t.Month())
for _, m := range months {
if m > month {
return time.Date(t.Year(), time.Month(m), 1, 0, 0, 0, 0, t.Location())
}
}
if len(months) > 0 {
return time.Date(t.Year()+1, time.Month(months[0]), 1, 0, 0, 0, 0, t.Location())
}
return time.Time{}
}
func cronNextHour(t time.Time, hours []int) time.Time {
h := t.Hour()
for _, hh := range hours {
if hh > h {
return time.Date(t.Year(), t.Month(), t.Day(), hh, 0, 0, 0, t.Location())
}
}
return time.Time{}
}
func cronNextMinute(t time.Time, minutes []int) time.Time {
m := t.Minute()
for _, mm := range minutes {
if mm > m {
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), mm, 0, 0, t.Location())
}
}
return time.Time{}
}