Skip to content

Instantly share code, notes, and snippets.

@yusufsyaifudin
Created May 22, 2025 07:03
Show Gist options
  • Save yusufsyaifudin/1daa1137234507dd49b5da6cf574b778 to your computer and use it in GitHub Desktop.
Save yusufsyaifudin/1daa1137234507dd49b5da6cf574b778 to your computer and use it in GitHub Desktop.
golang worker
package main
import (
"context"
"fmt"
"log/slog"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
)
// Job represents a unit of work
type Job struct {
Ctx context.Context
ID string
Payload any
}
// WorkerPool manages a pool of workers that process Jobs
type WorkerPool struct {
queueData chan Job
queueNum int64
wg *sync.WaitGroup
once sync.Once
closed atomic.Bool
}
// NewWorkerPool initializes a new WorkerPool
func NewWorkerPool(numWorkers int, queueSize int) *WorkerPool {
wp := &WorkerPool{
queueData: make(chan Job, queueSize),
queueNum: 0,
wg: &sync.WaitGroup{},
}
// Start workers
for i := 1; i <= numWorkers; i++ {
go wp.worker(i)
}
slog.Info("worker pool started",
slog.Int("workers", numWorkers),
slog.Int("queue_size", queueSize))
return wp
}
func (wp *WorkerPool) worker(workerID int) {
slog.Info("worker started", slog.Int("worker_id", workerID))
for msg := range wp.queueData {
t0 := time.Now()
// Process the job here (placeholder for actual work)
time.Sleep(50 * time.Millisecond) // Simulate work
slog.InfoContext(msg.Ctx, "processed job",
slog.String("job_id", msg.ID),
slog.Any("payload", msg.Payload),
slog.Int("queue_num", int(atomic.LoadInt64(&wp.queueNum))),
slog.Int("queue_size", len(wp.queueData)),
slog.Int("worker_id", workerID),
slog.String("duration", time.Since(t0).String()),
)
atomic.AddInt64(&wp.queueNum, -1)
wp.wg.Done()
}
slog.Info("worker stopped", slog.Int("worker_id", workerID))
}
// Publish sends a single job into the pool
// Returns false if the pool is closed
func (wp *WorkerPool) Publish(job Job) bool {
// Check if pool is closed
if wp.closed.Load() {
slog.Warn("attempted to publish to closed worker pool", slog.String("job_id", job.ID))
return false
}
// Set context if not provided
if job.Ctx == nil {
job.Ctx = context.Background()
}
wp.wg.Add(1)
atomic.AddInt64(&wp.queueNum, 1)
select {
case wp.queueData <- job:
return true
default:
// Channel is full, job rejected
wp.wg.Done()
atomic.AddInt64(&wp.queueNum, -1)
slog.Warn("worker pool queue full, job rejected", slog.String("job_id", job.ID))
return false
}
}
// Close gracefully shuts down the worker pool
func (wp *WorkerPool) Close() {
wp.once.Do(func() {
wp.closed.Store(true)
slog.Info("shutting down worker pool...")
// Wait for all current jobs to complete
wp.wg.Wait()
// Close the channel to stop workers
close(wp.queueData)
slog.Info("worker pool shut down complete")
})
}
// Stats returns current pool statistics
func (wp *WorkerPool) Stats() (queueNum int64, queueSize int, isClosed bool) {
return atomic.LoadInt64(&wp.queueNum), len(wp.queueData), wp.closed.Load()
}
func main() {
// Create worker pool that will run throughout the program
wp := NewWorkerPool(3, 10)
// Set up graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Simulate injecting worker pool to other functions
go func() {
for i := 1; i <= 5; i++ {
userID := fmt.Sprintf("user-%d", i)
job := Job{
Ctx: context.WithValue(context.Background(), "user_id", userID),
ID: fmt.Sprintf("%s-request-%d", userID, i),
Payload: fmt.Sprintf("processing request %d for user %s", i, userID),
}
if !wp.Publish(job) {
slog.Error("failed to publish job", slog.String("job_id", job.ID))
}
time.Sleep(200 * time.Millisecond)
}
}()
// Print stats periodically
go func() {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
queueNum, queueSize, closed := wp.Stats()
if closed {
return
}
slog.Info("worker pool stats",
slog.Int64("pending_jobs", queueNum),
slog.Int("queue_size", queueSize),
)
case <-sigChan:
return
}
}
}()
slog.Info("worker pool is running... Press Ctrl+C to shutdown")
// Wait for shutdown signal
<-sigChan
slog.Info("shutdown signal received")
// Gracefully shutdown the worker pool
wp.Close()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment