Created
December 8, 2020 01:00
-
-
Save ajm188/cee1f237dfe16f68241531f9ebcaec31 to your computer and use it in GitHub Desktop.
ShowAllKeyspaces stream
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func (s *VtctldServer) ShowAllKeyspaces(req *vtctldatapb.ShowAllKeyspacesRequest, stream vtctlservicepb.Vtctld_ShowAllKeyspacesServer) error { | |
ctx := stream.Context() | |
keyspaces, err := s.ts.GetKeyspaces(ctx) | |
if err != nil { | |
return err | |
} | |
wg := sync.WaitGroup{} | |
ch := make(chan string) | |
results := make(chan *vtctldatapb.Keyspace, len(keyspaces)) | |
er := concurrency.AllErrorRecorder{} | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
defer close(results) // tell the stream sender goroutine when we're done producing results | |
for keyspace := range ch { | |
ks, err := s.GetKeyspace(ctx, &vtctldatapb.GetKeyspaceRequest{Keyspace: keyspace}) | |
if err != nil { | |
er.RecordError(err) | |
return | |
} | |
if er.HasErrors() { | |
return | |
} | |
results <- ks | |
} | |
}() | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
for ks := range results { | |
if err := stream.Send(ks); err != nil { | |
er.RecordError(err) | |
return | |
} | |
} | |
}() | |
done := make(chan bool) | |
go func() { | |
wg.Wait() | |
done <- true | |
}() | |
for _, keyspace := range keyspaces { | |
select { | |
// In the normal path, this should never block. | |
case ch <- keyspace: | |
case <-ctx.Done(): | |
close(ch) | |
return ctx.Err() | |
case <-done: | |
// If we're done before we sent all the keyspaces, then someone else | |
// had an error. | |
close(ch) | |
return er.Error() | |
} | |
} | |
// We sent our work successfully, close the input channel. We never need to | |
// close the results channel, because that is managed by the producer | |
// goroutine. | |
close(ch) | |
<-done | |
return er.Error() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment