Skip to content

Instantly share code, notes, and snippets.

@klauspost
Created May 27, 2022 09:23

Revisions

  1. klauspost created this gist May 27, 2022.
    654 changes: 654 additions & 0 deletions compress.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,654 @@
    //go:build ignore
    // +build ignore

    package main

    // Adapted from : https://gist.github.com/arnehormann/65421048f56ac108f6b5

    import (
    "bufio"
    "bytes"
    "encoding/binary"
    "flag"
    "fmt"
    "io"
    "io/ioutil"
    "log"
    "os"
    "runtime"
    "runtime/pprof"
    "sync"
    "time"

    flstd "compress/flate"
    gzstd "compress/gzip"

    "github.com/andybalholm/brotli"
    "github.com/biogo/hts/bgzf"
    "github.com/dgryski/go-quicklz"
    "github.com/golang/snappy"
    flkp "github.com/klauspost/compress/flate"
    gzkp "github.com/klauspost/compress/gzip"
    "github.com/klauspost/compress/s2"
    zskp "github.com/klauspost/compress/zstd"
    "github.com/klauspost/dedup"
    pgz "github.com/klauspost/pgzip"
    "github.com/klauspost/readahead"
    "golang.org/x/build/pargzip"

    //"github.com/rasky/go-lzo"

    "github.com/pierrec/lz4"
    "github.com/ulikunitz/xz/lzma"
    zstd "github.com/valyala/gozstd"
    //"github.com/DataDog/zstd"
    //"github.com/youtube/vitess/go/cgzip"
    )

    type NoOp struct{}

    func (n NoOp) Read(v []byte) (int, error) {
    return len(v), nil
    }

    func (n NoOp) Write(v []byte) (int, error) {
    return len(v), nil
    }

    type SeqGen struct {
    i int
    }

    func (s *SeqGen) Read(v []byte) (int, error) {
    b := byte(s.i)
    for i := range v {
    v[i], b = b, b+1
    }
    return len(v), nil
    }

    type closeWrap struct {
    close func()
    }

    func (c closeWrap) Close() error {
    c.close()
    return nil
    }

    type Rand struct {
    // uses PCG (http://www.pcg-random.org/)

    state uint64
    inc uint64
    }

    const pcgmult64 = 6364136223846793005

    func NewRand(seed uint64) *Rand {
    state := uint64(0)
    inc := uint64(seed<<1) | 1
    state = state*pcgmult64 + (inc | 1)
    state += uint64(seed)
    state = state*pcgmult64 + (inc | 1)
    return &Rand{
    state: state,
    inc: inc,
    }
    }

    func (r *Rand) Read(v []byte) (int, error) {
    for w := v; len(w) > 0; w = w[4:] {
    old := r.state
    r.state = r.state*pcgmult64 + (r.inc | 1)
    xorshifted := uint32(((old >> 18) ^ old) >> 27)
    rot := uint32(old >> 59)
    rnd := (xorshifted >> rot) | (xorshifted << ((-rot) & 31))
    // ok because len(v) % 4 == 0
    binary.LittleEndian.PutUint32(w, rnd)
    }
    return len(v), nil
    }

    type wcounter struct {
    n int
    out io.Writer
    }

    func (w *wcounter) Write(p []byte) (n int, err error) {
    n, err = w.out.Write(p)
    w.n += n
    return n, err

    }

    /*
    func (w *wcounter) Close() (err error) {
    cl, ok := w.out.(io.Closer)
    if ok {
    return cl.Close()
    }
    return nil
    }
    */

    var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")

    func main() {
    rmode := "raw"
    wmode := "gzkp"
    wlevel := -1
    in := "-"
    out := "-"
    cpu := 0
    stats := false
    mem := false
    header := true
    numRuns := 1
    var closers []func() error

    flag.StringVar(&rmode, "r", rmode, "read mode (raw|flatekp|flatestd|gzkp|pgzip|cgzip|gzstd|zero|seq|rand)")
    flag.StringVar(&wmode, "w", wmode, "write mode (raw|flatekp|flatestd|gzkp|pgzip|gzstd|cgzip|none)")
    flag.StringVar(&in, "in", rmode, "input file name, default is '-', stdin")
    flag.StringVar(&out, "out", rmode, "input file name, default is '-', stdin")
    flag.IntVar(&wlevel, "l", wlevel, "compression level (-2|-1|0..9)")
    flag.IntVar(&cpu, "cpu", cpu, "GOMAXPROCS number (0|1...)")
    flag.IntVar(&numRuns, "n", numRuns, "Number of times to run.")
    flag.BoolVar(&stats, "stats", false, "show stats")
    flag.BoolVar(&header, "header", true, "show stats header")
    flag.BoolVar(&mem, "mem", false, "load source file into memory")
    flag.Parse()
    if flag.NArg() > 0 {
    flag.PrintDefaults()
    }
    cpu = runtime.GOMAXPROCS(cpu)

    if wlevel < -3 || 9 < wlevel {
    panic("compression level -l=x must be (-3,0..9)")
    }

    if *cpuprofile != "" {
    f, err := os.Create(*cpuprofile)
    if err != nil {
    log.Fatal(err)
    }
    pprof.StartCPUProfile(f)
    defer pprof.StopCPUProfile()
    }

    var err error
    var wg sync.WaitGroup

    var r io.Reader
    if in == "-" {
    r = os.Stdin
    } else {
    if !mem {
    r, err = os.Open(in)
    if err != nil {
    panic(err)
    }
    //r, _ = readahead.NewReaderSize(r, 10, 8<<20)
    } else {
    b, err := ioutil.ReadFile(in)
    if err != nil {
    panic(err)
    }
    for i := 0; i < numRuns; i++ {
    if r == nil {
    r = bytes.NewBuffer(b)
    } else {
    r = io.MultiReader(r, bytes.NewBuffer(b))
    }
    }
    }
    }
    var source bool

    switch rmode {
    case "zero":
    // NoOp writes what the original buffer contained unchanged.
    // As that buffer is initialized with 0 and not changed,
    // NoOp is usable as a very fast zero-reader.
    r = NoOp{}
    source = true
    case "seq":
    r = &SeqGen{}
    source = true
    case "rand":
    r = NewRand(0xdeadbeef)
    source = true
    case "raw":
    case "mem":
    b, err := ioutil.ReadFile(in)
    if err != nil {
    panic(err)
    }
    r = bytes.NewBuffer(b)
    case "gzkp":
    var gzr *gzkp.Reader
    if gzr, err = gzkp.NewReader(r); err == nil {
    closers = append(closers, gzr.Close)
    r = gzr
    }
    case "bgzf":
    var gzr *bgzf.Reader
    if gzr, err = bgzf.NewReader(r, cpu); err == nil {
    closers = append(closers, gzr.Close)
    r = gzr
    }
    case "pgzip":
    var gzr *pgz.Reader
    if gzr, err = pgz.NewReader(r); err == nil {
    closers = append(closers, gzr.Close)
    r = gzr
    }
    /* case "cgzip":
    var gzr io.ReadCloser
    if gzr, err = cgzip.NewReader(r); err == nil {
    closers = append(closers, gzr.Close)
    r = gzr
    }
    */
    case "gzstd":
    var gzr *gzstd.Reader
    if gzr, err = gzstd.NewReader(r); err == nil {
    closers = append(closers, gzr.Close)
    r = gzr
    }
    case "flatekp":
    fr := flkp.NewReader(r)
    closers = append(closers, fr.Close)
    r = fr
    case "flatestd":
    fr := flstd.NewReader(r)
    closers = append(closers, fr.Close)
    r = fr
    case "lzma":
    lr, err := lzma.NewReader(r)
    if err != nil {
    panic(err)
    }
    r = lr
    case "lzma2":
    lr, err := lzma.NewReader2(r)
    if err != nil {
    panic(err)
    }
    r = lr
    case "lz4":
    lr := lz4.NewReader(r)
    r = lr
    case "zstd":
    zr := zstd.NewReader(r)
    //closers = append(closers, zr.Close)
    r = zr
    case "s2":
    sr := s2.NewReader(r)
    r = sr
    case "snappy":
    sr := snappy.NewReader(r)
    r = sr
    default:
    panic("read mode -r=x must be (raw|flatekp|flatestd|gzkp|gzstd|zero|seq|rand)")
    }
    if err != nil {
    panic(err)
    }
    //r = ioutil.NopCloser(r)

    verifyNone := func(r *io.PipeReader) {
    n, err := io.Copy(ioutil.Discard, r)
    if err != nil {
    fmt.Println("Error reading input:", err)
    os.Exit(1)
    } else {
    fmt.Println("Not verified! bytes:", n)
    }
    wg.Done()
    }
    verifyDeflate := func(r *io.PipeReader) {
    reahah, _ := readahead.NewReaderSize(r, 10, 10<<20)
    gzr := flkp.NewReader(reahah)
    n, err := io.Copy(ioutil.Discard, gzr)
    if err != nil {
    fmt.Println("Error reading input:", err)
    os.Exit(1)
    } else {
    fmt.Println("Read back OK (No CRC)! bytes:", n)
    }
    wg.Done()
    }
    verifyGzip := func(r *io.PipeReader) {
    reahah, _ := readahead.NewReaderSize(r, 10, 10<<20)
    gzr, err := gzkp.NewReader(reahah)
    if err != nil {
    panic(err)
    }
    n, err := io.Copy(ioutil.Discard, gzr)
    if err != nil {
    fmt.Println("Error reading input:", err)
    os.Exit(1)
    } else {
    fmt.Println("Read back OK (gzip CRC verified)! bytes:", n)
    }
    wg.Done()
    }
    verifyS2 := func(r *io.PipeReader) {
    reahah, _ := readahead.NewReaderSize(r, 10, 10<<20)
    gzr := s2.NewReader(reahah)
    n, err := io.Copy(ioutil.Discard, gzr)
    if err != nil {
    fmt.Println("Error reading input:", err)
    os.Exit(1)
    } else {
    fmt.Println("Read back OK (CRC verified)! bytes:", n)
    }
    wg.Done()
    }
    verifyZstd := func(r *io.PipeReader) {
    reahah, _ := readahead.NewReaderSize(r, 10, 10<<20)
    gzr := zstd.NewReader(reahah)
    n, err := io.Copy(ioutil.Discard, gzr)
    if err != nil {
    fmt.Println("Error reading input:", err)
    os.Exit(1)
    } else {
    fmt.Println("Read back OK (CRC verified)! bytes:", n)
    }
    wg.Done()
    }
    var verifySrc *io.PipeReader
    var verify = verifyNone

    var w io.Writer
    if out == "-" {
    w = os.Stdout
    } else if out == "*" {
    w = ioutil.Discard
    out = "discard"
    } else if out == "verify" {
    preader, pwriter := io.Pipe()
    closers = append(closers, pwriter.Close)
    biow := bufio.NewWriterSize(pwriter, 10<<20)
    closers = append(closers, biow.Flush)
    verifySrc = preader
    w = biow
    out = "verify"
    } else {
    f, err := os.Create(out)
    if err != nil {
    panic(err)
    }
    closers = append(closers, f.Close)
    iow := bufio.NewWriter(f)
    closers = append(closers, iow.Flush)
    w = iow
    }
    outSize := &wcounter{out: w}
    w = outSize

    var sink bool
    switch wmode {
    case "none":
    w = NoOp{}
    sink = true
    case "raw":
    case "gzkp":
    var gzw *gzkp.Writer
    if gzw, err = gzkp.NewWriterLevel(w, wlevel); err == nil {
    closers = append(closers, gzw.Close)
    w = gzw
    }
    verify = verifyGzip
    case "pgzip":
    var gzw *pgz.Writer
    if gzw, err = pgz.NewWriterLevel(w, wlevel); err == nil {
    closers = append(closers, gzw.Close)
    w = gzw
    }
    verify = verifyGzip
    case "bgzf":
    var gzw *bgzf.Writer
    if gzw, err = bgzf.NewWriterLevel(w, wlevel, cpu); err == nil {
    closers = append(closers, gzw.Close)
    w = gzw
    }
    case "pargzip":
    var gzw *pargzip.Writer
    gzw = pargzip.NewWriter(w)
    //gzw.UseSystemGzip = false
    closers = append(closers, gzw.Close)
    w = gzw
    /* case "cgzip":
    var gzw *cgzip.Writer
    if gzw, err = cgzip.NewWriterLevel(w, wlevel); err == nil {
    closers = append(closers, gzw.Close)
    w = gzw
    }*/
    verify = verifyGzip
    case "br":
    brw := brotli.NewWriterLevel(w, wlevel)
    closers = append(closers, brw.Close)
    w = brw
    case "gzstd":
    var gzw *gzstd.Writer

    if gzw, err = gzstd.NewWriterLevel(w, wlevel); err == nil {
    closers = append(closers, gzw.Close)
    w = gzw
    }
    verify = verifyGzip
    case "dedup":
    var ddw dedup.Writer
    if ddw, err = dedup.NewStreamWriter(w, dedup.ModeDynamic, 8192, 1000*8192); err == nil {
    closers = append(closers, ddw.Close)
    w = ddw
    }
    case "s2":
    const blockSize = 4 << 20
    var sw *s2.Writer
    switch wlevel {
    case 0:
    sw = s2.NewWriter(w, s2.WriterUncompressed(), s2.WriterBlockSize(blockSize), s2.WriterConcurrency(cpu))
    case 1:
    sw = s2.NewWriter(w, s2.WriterBlockSize(blockSize), s2.WriterConcurrency(cpu))
    case 2:
    sw = s2.NewWriter(w, s2.WriterBetterCompression(), s2.WriterBlockSize(blockSize), s2.WriterConcurrency(cpu))
    case 3:
    sw = s2.NewWriter(w, s2.WriterBestCompression(), s2.WriterBlockSize(blockSize), s2.WriterConcurrency(cpu))
    default:
    panic("invalid compression level")
    }
    w = sw
    closers = append(closers, sw.Close)
    verify = verifyS2
    case "s2s":
    var sw *s2.Writer
    switch wlevel {
    case 0:
    sw = s2.NewWriter(w, s2.WriterUncompressed(), s2.WriterSnappyCompat(), s2.WriterConcurrency(cpu))
    case 1:
    sw = s2.NewWriter(w, s2.WriterSnappyCompat(), s2.WriterConcurrency(cpu))
    case 2:
    sw = s2.NewWriter(w, s2.WriterBetterCompression(), s2.WriterSnappyCompat(), s2.WriterConcurrency(cpu))
    case 3:
    sw = s2.NewWriter(w, s2.WriterBestCompression(), s2.WriterSnappyCompat(), s2.WriterConcurrency(cpu))
    default:
    panic("invalid compression level")
    }
    w = sw
    closers = append(closers, sw.Close)
    verify = verifyS2

    case "snappy":
    sw := snappy.NewWriter(w)
    w = sw
    /* case "lzo1x":
    sw := lzo.NewWriter(w, wlevel)
    w = sw*/
    verify = verifyS2
    case "flatekp":
    if wlevel == -3 {
    gzw := flkp.NewStatelessWriter(w)
    closers = append(closers, gzw.Close)
    w = gzw
    break
    }
    var fw *flkp.Writer
    if fw, err = flkp.NewWriter(w, wlevel); err == nil {
    closers = append(closers, fw.Close)
    w = fw
    }
    verify = verifyDeflate
    case "flatestd":
    var fw *flstd.Writer
    if fw, err = flstd.NewWriter(w, wlevel); err == nil {
    closers = append(closers, fw.Close)
    w = fw
    }
    verify = verifyDeflate
    case "lzma":
    wc := lzma.WriterConfig{
    Properties: nil,
    DictCap: 0,
    BufSize: 0,
    Matcher: 0,
    SizeInHeader: false,
    Size: 0,
    EOSMarker: true,
    }
    if lw, err := wc.NewWriter(w); err == nil {
    closers = append(closers, lw.Close)
    w = lw
    }
    case "lzma2":
    wc := lzma.Writer2Config{
    Properties: nil,
    DictCap: 0,
    BufSize: 0,
    Matcher: 0,
    }
    if lw, err := wc.NewWriter2(w); err == nil {
    closers = append(closers, lw.Close)
    w = lw
    }
    case "lz4":
    lw := lz4.NewWriter(w)
    lw.Apply(lz4.ConcurrencyOption(cpu), lz4.CompressionLevelOption(lz4.CompressionLevel(wlevel)))
    closers = append(closers, lw.Close)
    w = lw
    verify = func(r *io.PipeReader) {
    reahah, _ := readahead.NewReaderSize(r, 10, 10<<20)
    gzr := lz4.NewReader(reahah)
    n, err := io.Copy(ioutil.Discard, gzr)
    if err != nil {
    fmt.Println("Error reading input:", err)
    os.Exit(1)
    } else {
    fmt.Println("Read back OK (CRC checked)! bytes:", n)
    }
    wg.Done()
    }

    case "zstd":
    zw := zstd.NewWriterLevel(w, wlevel)
    closers = append(closers, zw.Close)
    w = zw
    verify = verifyZstd
    case "zskp":
    zw, err := zskp.NewWriter(w, zskp.WithEncoderLevel(zskp.EncoderLevel(wlevel)), zskp.WithEncoderConcurrency(cpu))
    if err != nil {
    panic(err)
    }
    closers = append(closers, zw.Close)
    w = zw
    verify = verifyZstd
    case "s2zs":
    pr, pw := io.Pipe()
    var sw *s2.Writer
    switch wlevel {
    case 0:
    sw = s2.NewWriter(pw, s2.WriterUncompressed(), s2.WriterSnappyCompat(), s2.WriterConcurrency(cpu))
    case 1:
    sw = s2.NewWriter(pw, s2.WriterSnappyCompat(), s2.WriterConcurrency(cpu))
    case 2:
    sw = s2.NewWriter(pw, s2.WriterBetterCompression(), s2.WriterSnappyCompat(), s2.WriterConcurrency(cpu))
    case 3:
    sw = s2.NewWriter(pw, s2.WriterBestCompression(), s2.WriterSnappyCompat(), s2.WriterConcurrency(cpu))
    }
    ra := readahead.NewReader(pr)
    conv := zskp.SnappyConverter{}
    var wg sync.WaitGroup
    wg.Add(1)
    go func(w io.Writer) {
    defer wg.Done()
    _, err := conv.Convert(ra, w)
    pr.CloseWithError(err)
    }(w)
    w = sw
    closers = append(closers, closeWrap{wg.Wait}.Close, pw.Close, sw.Close)
    verify = verifyZstd
    case "qlz":
    qlw := quicklz.NewWriter(w, -wlevel)
    closers = append(closers, qlw.Close)
    w = qlw
    default:
    panic("write mode -w=x must be (raw|flatekp|flatestd|gzkp|pgzip|gzstd|none)")
    }
    if err != nil {
    panic(err)
    }

    if source && sink {
    return
    }
    if verifySrc != nil {
    wg.Add(1)
    go verify(verifySrc)
    }

    type directWriter interface {
    EncodeBuffer(buf []byte) (err error)
    }

    inSize := int64(0)
    start := time.Now()

    func() {
    if dw, ok := w.(directWriter); ok {
    if eb, ok := r.(*bytes.Buffer); ok {
    inSize += int64(eb.Len())
    err := dw.EncodeBuffer(eb.Bytes())
    if err != nil {
    panic(err)
    }
    for i := len(closers) - 1; i >= 0; i-- {
    closers[i]()
    }
    return
    }
    }
    nr, err := io.Copy(w, r)
    inSize += nr
    if err != nil && err != io.EOF {
    panic(err)
    }
    for i := len(closers) - 1; i >= 0; i-- {
    closers[i]()
    }
    }()
    if stats {
    elapsed := time.Since(start)
    wg.Wait()
    if header {
    fmt.Printf("file\tout\tlevel\tinsize\toutsize\tmillis\tmb/s\n")
    //fmt.Printf("file\tin\tout\tlevel\tcpu\tinsize\toutsize\tmillis\tmb/s\n")
    }
    mbpersec := (float64(inSize) / (1024 * 1024)) / (float64(elapsed) / (float64(time.Second)))
    //fmt.Printf("%s\t%s\t%s\t%d\t%d\t%d\t%d\t%d\t%.02f\n", in, rmode, wmode, wlevel, cpu, inSize, outSize.n, elapsed/time.Millisecond, mbpersec)
    fmt.Printf("%s\t%s\t%d\t%d\t%d\t%d\t%.02f\n", in, wmode, wlevel, inSize, outSize.n, elapsed/time.Millisecond, mbpersec)
    } else {
    wg.Wait()
    }
    }