Skip to content

Instantly share code, notes, and snippets.

@cideM
Last active September 8, 2022 13:41

Revisions

  1. cideM revised this gist Sep 8, 2022. 1 changed file with 23 additions and 21 deletions.
    44 changes: 23 additions & 21 deletions ten.go
    Original file line number Diff line number Diff line change
    @@ -12,36 +12,38 @@ func main() {
    outputChannel := make(chan string)
    errorChannel := make(chan error)

    limit := runtime.NumCPU()
    limit := int64(runtime.NumCPU())
    sem := semaphore.NewWeighted(limit)

    go func() {
    for s := range inputChannel {
    for {
    select {
    case <-ctx.Done():
    break
    default:
    case s, ok := <-inputChannel:
    if ok {
    if err := sem.Acquire(ctx, 1); err != nil {
    log.Printf("Failed to acquire semaphore: %v", err)
    break
    }

    go func(s string) {
    defer sem.Release(1)
    time.Sleep(time.Second * 3)

    result := strings.ToLower(s)
    outputChannel <- result
    }(s)
    } else {
    if err := sem.Acquire(ctx, limit); err != nil {
    log.Printf("Failed to acquire semaphore: %v", err)
    }
    close(outputChannel)
    close(errorChannel)
    }
    }

    if err := sem.Acquire(ctx, 1); err != nil {
    log.Printf("Failed to acquire semaphore: %v", err)
    break
    }

    go func(s string) {
    defer sem.Release(1)
    time.Sleep(time.Second * 3)

    result := strings.ToLower(s)
    outputChannel <- result
    }(s)
    }

    if err := sem.Acquire(ctx, limit); err != nil {
    log.Printf("Failed to acquire semaphore: %v", err)
    }
    close(outputChannel)
    close(errorChannel)
    }()

    sink(ctx, cancel, outputChannel, errorChannel)
  2. cideM created this gist Sep 20, 2021.
    48 changes: 48 additions & 0 deletions ten.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,48 @@
    func main() {
    source := []string{"FOO", "BAR", "BAX"}

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    inputChannel, err := producer(ctx, source)
    if err != nil {
    log.Fatal(err)
    }

    outputChannel := make(chan string)
    errorChannel := make(chan error)

    limit := runtime.NumCPU()
    sem := semaphore.NewWeighted(limit)

    go func() {
    for s := range inputChannel {
    select {
    case <-ctx.Done():
    break
    default:
    }

    if err := sem.Acquire(ctx, 1); err != nil {
    log.Printf("Failed to acquire semaphore: %v", err)
    break
    }

    go func(s string) {
    defer sem.Release(1)
    time.Sleep(time.Second * 3)

    result := strings.ToLower(s)
    outputChannel <- result
    }(s)
    }

    if err := sem.Acquire(ctx, limit); err != nil {
    log.Printf("Failed to acquire semaphore: %v", err)
    }
    close(outputChannel)
    close(errorChannel)
    }()

    sink(ctx, cancel, outputChannel, errorChannel)
    }