Created
May 22, 2025 07:03
-
-
Save yusufsyaifudin/1daa1137234507dd49b5da6cf574b778 to your computer and use it in GitHub Desktop.
golang worker
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"log/slog" | |
"os" | |
"os/signal" | |
"sync" | |
"sync/atomic" | |
"syscall" | |
"time" | |
) | |
// Job represents a unit of work | |
type Job struct { | |
Ctx context.Context | |
ID string | |
Payload any | |
} | |
// WorkerPool manages a pool of workers that process Jobs | |
type WorkerPool struct { | |
queueData chan Job | |
queueNum int64 | |
wg *sync.WaitGroup | |
once sync.Once | |
closed atomic.Bool | |
} | |
// NewWorkerPool initializes a new WorkerPool | |
func NewWorkerPool(numWorkers int, queueSize int) *WorkerPool { | |
wp := &WorkerPool{ | |
queueData: make(chan Job, queueSize), | |
queueNum: 0, | |
wg: &sync.WaitGroup{}, | |
} | |
// Start workers | |
for i := 1; i <= numWorkers; i++ { | |
go wp.worker(i) | |
} | |
slog.Info("worker pool started", | |
slog.Int("workers", numWorkers), | |
slog.Int("queue_size", queueSize)) | |
return wp | |
} | |
func (wp *WorkerPool) worker(workerID int) { | |
slog.Info("worker started", slog.Int("worker_id", workerID)) | |
for msg := range wp.queueData { | |
t0 := time.Now() | |
// Process the job here (placeholder for actual work) | |
time.Sleep(50 * time.Millisecond) // Simulate work | |
slog.InfoContext(msg.Ctx, "processed job", | |
slog.String("job_id", msg.ID), | |
slog.Any("payload", msg.Payload), | |
slog.Int("queue_num", int(atomic.LoadInt64(&wp.queueNum))), | |
slog.Int("queue_size", len(wp.queueData)), | |
slog.Int("worker_id", workerID), | |
slog.String("duration", time.Since(t0).String()), | |
) | |
atomic.AddInt64(&wp.queueNum, -1) | |
wp.wg.Done() | |
} | |
slog.Info("worker stopped", slog.Int("worker_id", workerID)) | |
} | |
// Publish sends a single job into the pool | |
// Returns false if the pool is closed | |
func (wp *WorkerPool) Publish(job Job) bool { | |
// Check if pool is closed | |
if wp.closed.Load() { | |
slog.Warn("attempted to publish to closed worker pool", slog.String("job_id", job.ID)) | |
return false | |
} | |
// Set context if not provided | |
if job.Ctx == nil { | |
job.Ctx = context.Background() | |
} | |
wp.wg.Add(1) | |
atomic.AddInt64(&wp.queueNum, 1) | |
select { | |
case wp.queueData <- job: | |
return true | |
default: | |
// Channel is full, job rejected | |
wp.wg.Done() | |
atomic.AddInt64(&wp.queueNum, -1) | |
slog.Warn("worker pool queue full, job rejected", slog.String("job_id", job.ID)) | |
return false | |
} | |
} | |
// Close gracefully shuts down the worker pool | |
func (wp *WorkerPool) Close() { | |
wp.once.Do(func() { | |
wp.closed.Store(true) | |
slog.Info("shutting down worker pool...") | |
// Wait for all current jobs to complete | |
wp.wg.Wait() | |
// Close the channel to stop workers | |
close(wp.queueData) | |
slog.Info("worker pool shut down complete") | |
}) | |
} | |
// Stats returns current pool statistics | |
func (wp *WorkerPool) Stats() (queueNum int64, queueSize int, isClosed bool) { | |
return atomic.LoadInt64(&wp.queueNum), len(wp.queueData), wp.closed.Load() | |
} | |
func main() { | |
// Create worker pool that will run throughout the program | |
wp := NewWorkerPool(3, 10) | |
// Set up graceful shutdown | |
sigChan := make(chan os.Signal, 1) | |
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) | |
// Simulate injecting worker pool to other functions | |
go func() { | |
for i := 1; i <= 5; i++ { | |
userID := fmt.Sprintf("user-%d", i) | |
job := Job{ | |
Ctx: context.WithValue(context.Background(), "user_id", userID), | |
ID: fmt.Sprintf("%s-request-%d", userID, i), | |
Payload: fmt.Sprintf("processing request %d for user %s", i, userID), | |
} | |
if !wp.Publish(job) { | |
slog.Error("failed to publish job", slog.String("job_id", job.ID)) | |
} | |
time.Sleep(200 * time.Millisecond) | |
} | |
}() | |
// Print stats periodically | |
go func() { | |
ticker := time.NewTicker(2 * time.Second) | |
defer ticker.Stop() | |
for { | |
select { | |
case <-ticker.C: | |
queueNum, queueSize, closed := wp.Stats() | |
if closed { | |
return | |
} | |
slog.Info("worker pool stats", | |
slog.Int64("pending_jobs", queueNum), | |
slog.Int("queue_size", queueSize), | |
) | |
case <-sigChan: | |
return | |
} | |
} | |
}() | |
slog.Info("worker pool is running... Press Ctrl+C to shutdown") | |
// Wait for shutdown signal | |
<-sigChan | |
slog.Info("shutdown signal received") | |
// Gracefully shutdown the worker pool | |
wp.Close() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment