Created
May 27, 2022 09:23
Revisions
-
klauspost created this gist
May 27, 2022 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,654 @@ //go:build ignore // +build ignore package main // Adapted from : https://gist.github.com/arnehormann/65421048f56ac108f6b5 import ( "bufio" "bytes" "encoding/binary" "flag" "fmt" "io" "io/ioutil" "log" "os" "runtime" "runtime/pprof" "sync" "time" flstd "compress/flate" gzstd "compress/gzip" "github.com/andybalholm/brotli" "github.com/biogo/hts/bgzf" "github.com/dgryski/go-quicklz" "github.com/golang/snappy" flkp "github.com/klauspost/compress/flate" gzkp "github.com/klauspost/compress/gzip" "github.com/klauspost/compress/s2" zskp "github.com/klauspost/compress/zstd" "github.com/klauspost/dedup" pgz "github.com/klauspost/pgzip" "github.com/klauspost/readahead" "golang.org/x/build/pargzip" //"github.com/rasky/go-lzo" "github.com/pierrec/lz4" "github.com/ulikunitz/xz/lzma" zstd "github.com/valyala/gozstd" //"github.com/DataDog/zstd" //"github.com/youtube/vitess/go/cgzip" ) type NoOp struct{} func (n NoOp) Read(v []byte) (int, error) { return len(v), nil } func (n NoOp) Write(v []byte) (int, error) { return len(v), nil } type SeqGen struct { i int } func (s *SeqGen) Read(v []byte) (int, error) { b := byte(s.i) for i := range v { v[i], b = b, b+1 } return len(v), nil } type closeWrap struct { close func() } func (c closeWrap) Close() error { c.close() return nil } type Rand struct { // uses PCG (http://www.pcg-random.org/) state uint64 inc uint64 } const pcgmult64 = 6364136223846793005 func NewRand(seed uint64) *Rand { state := uint64(0) inc := uint64(seed<<1) | 1 state = state*pcgmult64 + (inc | 1) state += uint64(seed) state = state*pcgmult64 + (inc | 1) return &Rand{ state: state, inc: inc, } } func (r *Rand) Read(v []byte) (int, error) { for w := v; len(w) > 0; w = w[4:] { old := r.state r.state = r.state*pcgmult64 + (r.inc | 1) xorshifted := uint32(((old >> 18) ^ old) >> 27) rot := uint32(old >> 59) rnd := (xorshifted >> rot) | (xorshifted << ((-rot) & 31)) // ok because len(v) % 4 == 0 binary.LittleEndian.PutUint32(w, rnd) } return len(v), nil } type wcounter struct { n int out io.Writer } func (w *wcounter) Write(p []byte) (n int, err error) { n, err = w.out.Write(p) w.n += n return n, err } /* func (w *wcounter) Close() (err error) { cl, ok := w.out.(io.Closer) if ok { return cl.Close() } return nil } */ var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") func main() { rmode := "raw" wmode := "gzkp" wlevel := -1 in := "-" out := "-" cpu := 0 stats := false mem := false header := true numRuns := 1 var closers []func() error flag.StringVar(&rmode, "r", rmode, "read mode (raw|flatekp|flatestd|gzkp|pgzip|cgzip|gzstd|zero|seq|rand)") flag.StringVar(&wmode, "w", wmode, "write mode (raw|flatekp|flatestd|gzkp|pgzip|gzstd|cgzip|none)") flag.StringVar(&in, "in", rmode, "input file name, default is '-', stdin") flag.StringVar(&out, "out", rmode, "input file name, default is '-', stdin") flag.IntVar(&wlevel, "l", wlevel, "compression level (-2|-1|0..9)") flag.IntVar(&cpu, "cpu", cpu, "GOMAXPROCS number (0|1...)") flag.IntVar(&numRuns, "n", numRuns, "Number of times to run.") flag.BoolVar(&stats, "stats", false, "show stats") flag.BoolVar(&header, "header", true, "show stats header") flag.BoolVar(&mem, "mem", false, "load source file into memory") flag.Parse() if flag.NArg() > 0 { flag.PrintDefaults() } cpu = runtime.GOMAXPROCS(cpu) if wlevel < -3 || 9 < wlevel { panic("compression level -l=x must be (-3,0..9)") } if *cpuprofile != "" { f, err := os.Create(*cpuprofile) if err != nil { log.Fatal(err) } pprof.StartCPUProfile(f) defer pprof.StopCPUProfile() } var err error var wg sync.WaitGroup var r io.Reader if in == "-" { r = os.Stdin } else { if !mem { r, err = os.Open(in) if err != nil { panic(err) } //r, _ = readahead.NewReaderSize(r, 10, 8<<20) } else { b, err := ioutil.ReadFile(in) if err != nil { panic(err) } for i := 0; i < numRuns; i++ { if r == nil { r = bytes.NewBuffer(b) } else { r = io.MultiReader(r, bytes.NewBuffer(b)) } } } } var source bool switch rmode { case "zero": // NoOp writes what the original buffer contained unchanged. // As that buffer is initialized with 0 and not changed, // NoOp is usable as a very fast zero-reader. r = NoOp{} source = true case "seq": r = &SeqGen{} source = true case "rand": r = NewRand(0xdeadbeef) source = true case "raw": case "mem": b, err := ioutil.ReadFile(in) if err != nil { panic(err) } r = bytes.NewBuffer(b) case "gzkp": var gzr *gzkp.Reader if gzr, err = gzkp.NewReader(r); err == nil { closers = append(closers, gzr.Close) r = gzr } case "bgzf": var gzr *bgzf.Reader if gzr, err = bgzf.NewReader(r, cpu); err == nil { closers = append(closers, gzr.Close) r = gzr } case "pgzip": var gzr *pgz.Reader if gzr, err = pgz.NewReader(r); err == nil { closers = append(closers, gzr.Close) r = gzr } /* case "cgzip": var gzr io.ReadCloser if gzr, err = cgzip.NewReader(r); err == nil { closers = append(closers, gzr.Close) r = gzr } */ case "gzstd": var gzr *gzstd.Reader if gzr, err = gzstd.NewReader(r); err == nil { closers = append(closers, gzr.Close) r = gzr } case "flatekp": fr := flkp.NewReader(r) closers = append(closers, fr.Close) r = fr case "flatestd": fr := flstd.NewReader(r) closers = append(closers, fr.Close) r = fr case "lzma": lr, err := lzma.NewReader(r) if err != nil { panic(err) } r = lr case "lzma2": lr, err := lzma.NewReader2(r) if err != nil { panic(err) } r = lr case "lz4": lr := lz4.NewReader(r) r = lr case "zstd": zr := zstd.NewReader(r) //closers = append(closers, zr.Close) r = zr case "s2": sr := s2.NewReader(r) r = sr case "snappy": sr := snappy.NewReader(r) r = sr default: panic("read mode -r=x must be (raw|flatekp|flatestd|gzkp|gzstd|zero|seq|rand)") } if err != nil { panic(err) } //r = ioutil.NopCloser(r) verifyNone := func(r *io.PipeReader) { n, err := io.Copy(ioutil.Discard, r) if err != nil { fmt.Println("Error reading input:", err) os.Exit(1) } else { fmt.Println("Not verified! bytes:", n) } wg.Done() } verifyDeflate := func(r *io.PipeReader) { reahah, _ := readahead.NewReaderSize(r, 10, 10<<20) gzr := flkp.NewReader(reahah) n, err := io.Copy(ioutil.Discard, gzr) if err != nil { fmt.Println("Error reading input:", err) os.Exit(1) } else { fmt.Println("Read back OK (No CRC)! bytes:", n) } wg.Done() } verifyGzip := func(r *io.PipeReader) { reahah, _ := readahead.NewReaderSize(r, 10, 10<<20) gzr, err := gzkp.NewReader(reahah) if err != nil { panic(err) } n, err := io.Copy(ioutil.Discard, gzr) if err != nil { fmt.Println("Error reading input:", err) os.Exit(1) } else { fmt.Println("Read back OK (gzip CRC verified)! bytes:", n) } wg.Done() } verifyS2 := func(r *io.PipeReader) { reahah, _ := readahead.NewReaderSize(r, 10, 10<<20) gzr := s2.NewReader(reahah) n, err := io.Copy(ioutil.Discard, gzr) if err != nil { fmt.Println("Error reading input:", err) os.Exit(1) } else { fmt.Println("Read back OK (CRC verified)! bytes:", n) } wg.Done() } verifyZstd := func(r *io.PipeReader) { reahah, _ := readahead.NewReaderSize(r, 10, 10<<20) gzr := zstd.NewReader(reahah) n, err := io.Copy(ioutil.Discard, gzr) if err != nil { fmt.Println("Error reading input:", err) os.Exit(1) } else { fmt.Println("Read back OK (CRC verified)! bytes:", n) } wg.Done() } var verifySrc *io.PipeReader var verify = verifyNone var w io.Writer if out == "-" { w = os.Stdout } else if out == "*" { w = ioutil.Discard out = "discard" } else if out == "verify" { preader, pwriter := io.Pipe() closers = append(closers, pwriter.Close) biow := bufio.NewWriterSize(pwriter, 10<<20) closers = append(closers, biow.Flush) verifySrc = preader w = biow out = "verify" } else { f, err := os.Create(out) if err != nil { panic(err) } closers = append(closers, f.Close) iow := bufio.NewWriter(f) closers = append(closers, iow.Flush) w = iow } outSize := &wcounter{out: w} w = outSize var sink bool switch wmode { case "none": w = NoOp{} sink = true case "raw": case "gzkp": var gzw *gzkp.Writer if gzw, err = gzkp.NewWriterLevel(w, wlevel); err == nil { closers = append(closers, gzw.Close) w = gzw } verify = verifyGzip case "pgzip": var gzw *pgz.Writer if gzw, err = pgz.NewWriterLevel(w, wlevel); err == nil { closers = append(closers, gzw.Close) w = gzw } verify = verifyGzip case "bgzf": var gzw *bgzf.Writer if gzw, err = bgzf.NewWriterLevel(w, wlevel, cpu); err == nil { closers = append(closers, gzw.Close) w = gzw } case "pargzip": var gzw *pargzip.Writer gzw = pargzip.NewWriter(w) //gzw.UseSystemGzip = false closers = append(closers, gzw.Close) w = gzw /* case "cgzip": var gzw *cgzip.Writer if gzw, err = cgzip.NewWriterLevel(w, wlevel); err == nil { closers = append(closers, gzw.Close) w = gzw }*/ verify = verifyGzip case "br": brw := brotli.NewWriterLevel(w, wlevel) closers = append(closers, brw.Close) w = brw case "gzstd": var gzw *gzstd.Writer if gzw, err = gzstd.NewWriterLevel(w, wlevel); err == nil { closers = append(closers, gzw.Close) w = gzw } verify = verifyGzip case "dedup": var ddw dedup.Writer if ddw, err = dedup.NewStreamWriter(w, dedup.ModeDynamic, 8192, 1000*8192); err == nil { closers = append(closers, ddw.Close) w = ddw } case "s2": const blockSize = 4 << 20 var sw *s2.Writer switch wlevel { case 0: sw = s2.NewWriter(w, s2.WriterUncompressed(), s2.WriterBlockSize(blockSize), s2.WriterConcurrency(cpu)) case 1: sw = s2.NewWriter(w, s2.WriterBlockSize(blockSize), s2.WriterConcurrency(cpu)) case 2: sw = s2.NewWriter(w, s2.WriterBetterCompression(), s2.WriterBlockSize(blockSize), s2.WriterConcurrency(cpu)) case 3: sw = s2.NewWriter(w, s2.WriterBestCompression(), s2.WriterBlockSize(blockSize), s2.WriterConcurrency(cpu)) default: panic("invalid compression level") } w = sw closers = append(closers, sw.Close) verify = verifyS2 case "s2s": var sw *s2.Writer switch wlevel { case 0: sw = s2.NewWriter(w, s2.WriterUncompressed(), s2.WriterSnappyCompat(), s2.WriterConcurrency(cpu)) case 1: sw = s2.NewWriter(w, s2.WriterSnappyCompat(), s2.WriterConcurrency(cpu)) case 2: sw = s2.NewWriter(w, s2.WriterBetterCompression(), s2.WriterSnappyCompat(), s2.WriterConcurrency(cpu)) case 3: sw = s2.NewWriter(w, s2.WriterBestCompression(), s2.WriterSnappyCompat(), s2.WriterConcurrency(cpu)) default: panic("invalid compression level") } w = sw closers = append(closers, sw.Close) verify = verifyS2 case "snappy": sw := snappy.NewWriter(w) w = sw /* case "lzo1x": sw := lzo.NewWriter(w, wlevel) w = sw*/ verify = verifyS2 case "flatekp": if wlevel == -3 { gzw := flkp.NewStatelessWriter(w) closers = append(closers, gzw.Close) w = gzw break } var fw *flkp.Writer if fw, err = flkp.NewWriter(w, wlevel); err == nil { closers = append(closers, fw.Close) w = fw } verify = verifyDeflate case "flatestd": var fw *flstd.Writer if fw, err = flstd.NewWriter(w, wlevel); err == nil { closers = append(closers, fw.Close) w = fw } verify = verifyDeflate case "lzma": wc := lzma.WriterConfig{ Properties: nil, DictCap: 0, BufSize: 0, Matcher: 0, SizeInHeader: false, Size: 0, EOSMarker: true, } if lw, err := wc.NewWriter(w); err == nil { closers = append(closers, lw.Close) w = lw } case "lzma2": wc := lzma.Writer2Config{ Properties: nil, DictCap: 0, BufSize: 0, Matcher: 0, } if lw, err := wc.NewWriter2(w); err == nil { closers = append(closers, lw.Close) w = lw } case "lz4": lw := lz4.NewWriter(w) lw.Apply(lz4.ConcurrencyOption(cpu), lz4.CompressionLevelOption(lz4.CompressionLevel(wlevel))) closers = append(closers, lw.Close) w = lw verify = func(r *io.PipeReader) { reahah, _ := readahead.NewReaderSize(r, 10, 10<<20) gzr := lz4.NewReader(reahah) n, err := io.Copy(ioutil.Discard, gzr) if err != nil { fmt.Println("Error reading input:", err) os.Exit(1) } else { fmt.Println("Read back OK (CRC checked)! bytes:", n) } wg.Done() } case "zstd": zw := zstd.NewWriterLevel(w, wlevel) closers = append(closers, zw.Close) w = zw verify = verifyZstd case "zskp": zw, err := zskp.NewWriter(w, zskp.WithEncoderLevel(zskp.EncoderLevel(wlevel)), zskp.WithEncoderConcurrency(cpu)) if err != nil { panic(err) } closers = append(closers, zw.Close) w = zw verify = verifyZstd case "s2zs": pr, pw := io.Pipe() var sw *s2.Writer switch wlevel { case 0: sw = s2.NewWriter(pw, s2.WriterUncompressed(), s2.WriterSnappyCompat(), s2.WriterConcurrency(cpu)) case 1: sw = s2.NewWriter(pw, s2.WriterSnappyCompat(), s2.WriterConcurrency(cpu)) case 2: sw = s2.NewWriter(pw, s2.WriterBetterCompression(), s2.WriterSnappyCompat(), s2.WriterConcurrency(cpu)) case 3: sw = s2.NewWriter(pw, s2.WriterBestCompression(), s2.WriterSnappyCompat(), s2.WriterConcurrency(cpu)) } ra := readahead.NewReader(pr) conv := zskp.SnappyConverter{} var wg sync.WaitGroup wg.Add(1) go func(w io.Writer) { defer wg.Done() _, err := conv.Convert(ra, w) pr.CloseWithError(err) }(w) w = sw closers = append(closers, closeWrap{wg.Wait}.Close, pw.Close, sw.Close) verify = verifyZstd case "qlz": qlw := quicklz.NewWriter(w, -wlevel) closers = append(closers, qlw.Close) w = qlw default: panic("write mode -w=x must be (raw|flatekp|flatestd|gzkp|pgzip|gzstd|none)") } if err != nil { panic(err) } if source && sink { return } if verifySrc != nil { wg.Add(1) go verify(verifySrc) } type directWriter interface { EncodeBuffer(buf []byte) (err error) } inSize := int64(0) start := time.Now() func() { if dw, ok := w.(directWriter); ok { if eb, ok := r.(*bytes.Buffer); ok { inSize += int64(eb.Len()) err := dw.EncodeBuffer(eb.Bytes()) if err != nil { panic(err) } for i := len(closers) - 1; i >= 0; i-- { closers[i]() } return } } nr, err := io.Copy(w, r) inSize += nr if err != nil && err != io.EOF { panic(err) } for i := len(closers) - 1; i >= 0; i-- { closers[i]() } }() if stats { elapsed := time.Since(start) wg.Wait() if header { fmt.Printf("file\tout\tlevel\tinsize\toutsize\tmillis\tmb/s\n") //fmt.Printf("file\tin\tout\tlevel\tcpu\tinsize\toutsize\tmillis\tmb/s\n") } mbpersec := (float64(inSize) / (1024 * 1024)) / (float64(elapsed) / (float64(time.Second))) //fmt.Printf("%s\t%s\t%s\t%d\t%d\t%d\t%d\t%d\t%.02f\n", in, rmode, wmode, wlevel, cpu, inSize, outSize.n, elapsed/time.Millisecond, mbpersec) fmt.Printf("%s\t%s\t%d\t%d\t%d\t%d\t%.02f\n", in, wmode, wlevel, inSize, outSize.n, elapsed/time.Millisecond, mbpersec) } else { wg.Wait() } }