Skip to content

Instantly share code, notes, and snippets.

@17twenty
Created May 31, 2025 07:10
Show Gist options
  • Save 17twenty/8a53dbd074309b35edd7cad0972a20fa to your computer and use it in GitHub Desktop.
Save 17twenty/8a53dbd074309b35edd7cad0972a20fa to your computer and use it in GitHub Desktop.
Pretty good example of using the functions of River Queue and (re)scheduling at defined times
package main
import (
"context"
"fmt"
"log"
"strconv"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
)
var (
DB_NAME = "hodie-db"
DB_HOST = "localhost"
DB_PORT = 5432
DB_USER = "local"
DB_PASSWORD = "asecurepassword"
)
type LogHeartbeatJob struct{}
func (LogHeartbeatJob) Kind() string { return "log_heartbeat" }
type LogHeartbeatWorker struct {
river.WorkerDefaults[LogHeartbeatJob]
}
func (w *LogHeartbeatWorker) Work(ctx context.Context, job *river.Job[LogHeartbeatJob]) error {
log.Println("Heartbeat job executed")
return nil
}
type EggplantJob struct{}
func (EggplantJob) Kind() string { return "eggplant" }
type EggplantWorker struct {
river.WorkerDefaults[EggplantJob]
}
func (w *EggplantWorker) Work(ctx context.Context, job *river.Job[EggplantJob]) error {
log.Println("๐Ÿ† A wild Eggplant has appeared!")
return nil
}
func PauseAllJobs(ctx context.Context, client *river.Client[pgx.Tx], jobKind string) error {
sql := `
UPDATE river_jobs
SET state = 'paused'
WHERE kind = $1 AND state = 'pending'
`
_, err := client.Driver().GetExecutor().Exec(ctx, sql, jobKind)
return err
}
func ResumeAllJobs(ctx context.Context, client *river.Client[pgx.Tx], jobKind string) error {
sql := `
UPDATE river_jobs
SET state = 'pending'
WHERE kind = $1 AND state = 'paused'
`
_, err := client.Driver().GetExecutor().Exec(ctx, sql, jobKind)
return err
}
func main() {
ctx := context.Background()
// Postgres setup
dbpool, err := pgxpool.New(ctx, "postgres://"+DB_USER+":"+DB_PASSWORD+"@"+DB_HOST+":"+strconv.Itoa(DB_PORT)+"/"+DB_NAME)
if err != nil {
log.Fatalf("Failed to create database pool: %v", err)
}
defer dbpool.Close()
// River setup
driver := riverpgxv5.New(dbpool)
workers := river.NewWorkers()
river.AddWorker(workers, &LogHeartbeatWorker{})
river.AddWorker(workers, &EggplantWorker{})
river.AddWorker(workers, &ScheduledWorker{})
periodic := []*river.PeriodicJob{
river.NewPeriodicJob(
river.PeriodicInterval(3*time.Second),
func() (river.JobArgs, *river.InsertOpts) {
return EggplantJob{}, nil
},
&river.PeriodicJobOpts{RunOnStart: true},
),
}
client, err := river.NewClient[pgx.Tx](driver, &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 10},
},
Workers: workers,
PeriodicJobs: periodic,
})
if err != nil {
log.Fatalf("Failed to create River client: %v", err)
}
// Start workers
go func() {
if err := client.Start(ctx); err != nil {
log.Fatalf("River client failed to start: %v", err)
}
}()
_, err = client.Insert(ctx,
ScheduledArgs{
Message: "hello from the future ๐ŸŒ",
},
&river.InsertOpts{
ScheduledAt: time.Now().Add(15 * time.Second),
},
)
if err != nil {
log.Fatalf("Failed to insert scheduled job: %v", err)
}
// Schedule jobs
go startEvery10Seconds(ctx, client)
go runEveryMinuteAt(ctx, client, 3) // run at second 3 of every minute
// Block forever
select {}
}
type ScheduledArgs struct {
Message string `json:"message"`
}
func (ScheduledArgs) Kind() string { return "scheduled" }
type ScheduledWorker struct {
river.WorkerDefaults[ScheduledArgs]
}
func (w *ScheduledWorker) Work(ctx context.Context, job *river.Job[ScheduledArgs]) error {
fmt.Printf("Message: %s\n", job.Args.Message)
return nil
}
// Some thoughts on dedupe to stop issues where we crash as we're resending
// _, err := client.Insert(ctx, MyJob{UserID: "123"}, &river.InsertOpts{
// DedupID: "[email protected]",
// })
// We can also make sure we dont func up business logic by ensuring we do it as a transaction
//
// err = pgx.BeginFunc(ctx, dbpool, func(tx pgx.Tx) error {
// // do business logic
// _, err := client.InsertTx(ctx, tx, MyJob{}, nil)
// return err
// })
func startEvery10Seconds(ctx context.Context, client *river.Client[pgx.Tx]) {
ticker := time.NewTicker(10 * time.Second)
for {
select {
case <-ticker.C:
_, err := client.Insert(ctx, LogHeartbeatJob{}, nil)
if err != nil {
log.Printf("Error inserting 10s job: %v", err)
}
case <-ctx.Done():
return
}
}
}
func runEveryMinuteAt(ctx context.Context, client *river.Client[pgx.Tx], second int) {
for {
now := time.Now()
next := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute()+1, second, 0, now.Location())
time.Sleep(time.Until(next))
_, err := client.Insert(ctx, LogHeartbeatJob{}, nil)
if err != nil {
log.Printf("Error inserting minute job: %v", err)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment