Created
May 31, 2025 07:10
-
-
Save 17twenty/8a53dbd074309b35edd7cad0972a20fa to your computer and use it in GitHub Desktop.
Pretty good example of using the functions of River Queue and (re)scheduling at defined times
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"log" | |
"strconv" | |
"time" | |
"github.com/jackc/pgx/v5" | |
"github.com/jackc/pgx/v5/pgxpool" | |
"github.com/riverqueue/river" | |
"github.com/riverqueue/river/riverdriver/riverpgxv5" | |
) | |
var ( | |
DB_NAME = "hodie-db" | |
DB_HOST = "localhost" | |
DB_PORT = 5432 | |
DB_USER = "local" | |
DB_PASSWORD = "asecurepassword" | |
) | |
type LogHeartbeatJob struct{} | |
func (LogHeartbeatJob) Kind() string { return "log_heartbeat" } | |
type LogHeartbeatWorker struct { | |
river.WorkerDefaults[LogHeartbeatJob] | |
} | |
func (w *LogHeartbeatWorker) Work(ctx context.Context, job *river.Job[LogHeartbeatJob]) error { | |
log.Println("Heartbeat job executed") | |
return nil | |
} | |
type EggplantJob struct{} | |
func (EggplantJob) Kind() string { return "eggplant" } | |
type EggplantWorker struct { | |
river.WorkerDefaults[EggplantJob] | |
} | |
func (w *EggplantWorker) Work(ctx context.Context, job *river.Job[EggplantJob]) error { | |
log.Println("๐ A wild Eggplant has appeared!") | |
return nil | |
} | |
func PauseAllJobs(ctx context.Context, client *river.Client[pgx.Tx], jobKind string) error { | |
sql := ` | |
UPDATE river_jobs | |
SET state = 'paused' | |
WHERE kind = $1 AND state = 'pending' | |
` | |
_, err := client.Driver().GetExecutor().Exec(ctx, sql, jobKind) | |
return err | |
} | |
func ResumeAllJobs(ctx context.Context, client *river.Client[pgx.Tx], jobKind string) error { | |
sql := ` | |
UPDATE river_jobs | |
SET state = 'pending' | |
WHERE kind = $1 AND state = 'paused' | |
` | |
_, err := client.Driver().GetExecutor().Exec(ctx, sql, jobKind) | |
return err | |
} | |
func main() { | |
ctx := context.Background() | |
// Postgres setup | |
dbpool, err := pgxpool.New(ctx, "postgres://"+DB_USER+":"+DB_PASSWORD+"@"+DB_HOST+":"+strconv.Itoa(DB_PORT)+"/"+DB_NAME) | |
if err != nil { | |
log.Fatalf("Failed to create database pool: %v", err) | |
} | |
defer dbpool.Close() | |
// River setup | |
driver := riverpgxv5.New(dbpool) | |
workers := river.NewWorkers() | |
river.AddWorker(workers, &LogHeartbeatWorker{}) | |
river.AddWorker(workers, &EggplantWorker{}) | |
river.AddWorker(workers, &ScheduledWorker{}) | |
periodic := []*river.PeriodicJob{ | |
river.NewPeriodicJob( | |
river.PeriodicInterval(3*time.Second), | |
func() (river.JobArgs, *river.InsertOpts) { | |
return EggplantJob{}, nil | |
}, | |
&river.PeriodicJobOpts{RunOnStart: true}, | |
), | |
} | |
client, err := river.NewClient[pgx.Tx](driver, &river.Config{ | |
Queues: map[string]river.QueueConfig{ | |
river.QueueDefault: {MaxWorkers: 10}, | |
}, | |
Workers: workers, | |
PeriodicJobs: periodic, | |
}) | |
if err != nil { | |
log.Fatalf("Failed to create River client: %v", err) | |
} | |
// Start workers | |
go func() { | |
if err := client.Start(ctx); err != nil { | |
log.Fatalf("River client failed to start: %v", err) | |
} | |
}() | |
_, err = client.Insert(ctx, | |
ScheduledArgs{ | |
Message: "hello from the future ๐", | |
}, | |
&river.InsertOpts{ | |
ScheduledAt: time.Now().Add(15 * time.Second), | |
}, | |
) | |
if err != nil { | |
log.Fatalf("Failed to insert scheduled job: %v", err) | |
} | |
// Schedule jobs | |
go startEvery10Seconds(ctx, client) | |
go runEveryMinuteAt(ctx, client, 3) // run at second 3 of every minute | |
// Block forever | |
select {} | |
} | |
type ScheduledArgs struct { | |
Message string `json:"message"` | |
} | |
func (ScheduledArgs) Kind() string { return "scheduled" } | |
type ScheduledWorker struct { | |
river.WorkerDefaults[ScheduledArgs] | |
} | |
func (w *ScheduledWorker) Work(ctx context.Context, job *river.Job[ScheduledArgs]) error { | |
fmt.Printf("Message: %s\n", job.Args.Message) | |
return nil | |
} | |
// Some thoughts on dedupe to stop issues where we crash as we're resending | |
// _, err := client.Insert(ctx, MyJob{UserID: "123"}, &river.InsertOpts{ | |
// DedupID: "[email protected]", | |
// }) | |
// We can also make sure we dont func up business logic by ensuring we do it as a transaction | |
// | |
// err = pgx.BeginFunc(ctx, dbpool, func(tx pgx.Tx) error { | |
// // do business logic | |
// _, err := client.InsertTx(ctx, tx, MyJob{}, nil) | |
// return err | |
// }) | |
func startEvery10Seconds(ctx context.Context, client *river.Client[pgx.Tx]) { | |
ticker := time.NewTicker(10 * time.Second) | |
for { | |
select { | |
case <-ticker.C: | |
_, err := client.Insert(ctx, LogHeartbeatJob{}, nil) | |
if err != nil { | |
log.Printf("Error inserting 10s job: %v", err) | |
} | |
case <-ctx.Done(): | |
return | |
} | |
} | |
} | |
func runEveryMinuteAt(ctx context.Context, client *river.Client[pgx.Tx], second int) { | |
for { | |
now := time.Now() | |
next := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute()+1, second, 0, now.Location()) | |
time.Sleep(time.Until(next)) | |
_, err := client.Insert(ctx, LogHeartbeatJob{}, nil) | |
if err != nil { | |
log.Printf("Error inserting minute job: %v", err) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment