Last active
December 14, 2022 01:53
-
-
Save filipecosta90/4325150c346e31365938d863c11d7fd0 to your computer and use it in GitHub Desktop.
Code snippet addressing question: https://github.com/RedisTimeSeries/redistimeseries-go/issues/63. Usage example (for cluster use --cluster-mode): ./radix-redistimeseries-example --host localhost:20002
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"flag" | |
"fmt" | |
"github.com/mediocregopher/radix/v3" | |
"log" | |
"strconv" | |
"time" | |
) | |
// Code snippet addressing question: https://github.com/RedisTimeSeries/redistimeseries-go/issues/63 | |
// Usage example (for cluster use --cluster-mode): ./radix-redistimeseries-example --host localhost:20002 | |
// [1 1] | |
// (...) | |
// [10 10] | |
// temperature:{area_32}:1 | |
// map[area_id:32 sensor_id:1] | |
// map[1:2 2:4] | |
// temperature:{area_32}:2 | |
// map[area_id:32 sensor_id:2] | |
// map[1:2 2:4] | |
// Program option vars: | |
var ( | |
host string | |
poolPipelineConcurrency int | |
poolPipelineWindow time.Duration | |
clusterMode bool | |
) | |
// Options: | |
func init() { | |
flag.StringVar(&host, "host", "localhost:6379", "The host:port for Redis connection") | |
flag.DurationVar(&poolPipelineWindow, "pipeline-window-ms", time.Millisecond*0, "If window is zero then implicit pipelining will be disabled") | |
flag.IntVar(&poolPipelineConcurrency, "pipeline-max-size", 0, "If limit is zero then no limit will be used and pipelines will only be limited by the specified time window") | |
flag.BoolVar(&clusterMode, "cluster-mode", false, "If set to true, it will run the client in cluster mode.") | |
flag.Parse() | |
} | |
/* | |
* Example of how to connect to RedisTimeSeries ( cluster or standalone ) and produce write/read commands | |
*/ | |
func main() { | |
if clusterMode { | |
clusterClientExample() | |
} else { | |
vanillaClientExample() | |
} | |
} | |
func clusterClientExample() { | |
var err error = nil | |
key1 := "temperature:{area_32}:1" | |
key2 := "temperature:{area_32}:2" | |
var vanillaCluster *radix.Cluster | |
poolSize := 1 | |
var rcv interface{} | |
poolOptions := radix.PoolPipelineWindow(poolPipelineWindow, poolPipelineConcurrency) | |
poolFunc := func(network, addr string) (radix.Client, error) { | |
return radix.NewPool(network, addr, poolSize, poolOptions) | |
} | |
vanillaCluster, err = radix.NewCluster([]string{host}, radix.ClusterPoolFunc(poolFunc)) | |
if err != nil { | |
log.Fatalf("Error preparing for example, while creating new connection. error = %v", err) | |
} | |
fmt.Println(fmt.Sprintf("Cluster topology: %s",vanillaCluster.Topo())) | |
// ensure keys do not exist to avoid Error replies | |
err = vanillaCluster.Do(radix.FlatCmd(nil, "DEL", key1, key2)) | |
if err != nil { | |
log.Fatalf("Error preparing for example, while issuing DEL. error = %v", err) | |
} | |
// Create the new time-series with TS.CREATE | |
// Further reference here: https://oss.redislabs.com/redistimeseries/commands/#tscreate | |
createCmd1 := radix.FlatCmd(nil, "TS.CREATE", key1, "LABELS", "sensor_id", 1, "area_id", 32) | |
createCmd2 := radix.FlatCmd(nil, "TS.CREATE", key2, "LABELS", "sensor_id", 2, "area_id", 32) | |
err = vanillaCluster.Do(createCmd1) | |
if err != nil { | |
log.Fatalf("Error preparing for example, while creating new connection. error = %v", err) | |
} | |
err = vanillaCluster.Do(createCmd2) | |
if err != nil { | |
log.Fatalf("Error preparing for example, while creating new connection. error = %v", err) | |
} | |
// Populate the time-series with TS.MADD | |
datapointTimestamps := []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} | |
datapointValues := []float64{2, 4, 6, 8, 10, 12, 14, 16, 18, 20} | |
for idx, timestamp := range datapointTimestamps { | |
value := datapointValues[idx] | |
// Append new samples to a list of series. | |
// https://oss.redislabs.com/redistimeseries/commands/#tsmadd | |
maddCmd := radix.FlatCmd(&rcv, "TS.MADD", key1, timestamp, value, key2, timestamp, value) | |
err = vanillaCluster.Do(maddCmd) | |
if err != nil { | |
log.Fatalf("Error preparing while adding data points. error = %v", err) | |
} | |
fmt.Println(rcv) | |
//Output: | |
// <timestamp> | |
// <timestamp> | |
} | |
// Query a timestamp range across multiple time-series by filters. | |
// Further reference here: https://oss.redislabs.com/redistimeseries/commands/#tsmrange | |
for _, element := range vanillaCluster.Topo() { | |
elementClient, err := vanillaCluster.Client(element.Addr) | |
if err != nil { | |
log.Fatalf("Error preparing while adding data points. error = %v", err) | |
} | |
var mrangeReply []interface{} | |
mrangeCmd := radix.Cmd(&mrangeReply, "TS.MRANGE", "-", "+", "COUNT", "2", "WITHLABELS", "FILTER", "area_id=32") | |
err = elementClient.Do(mrangeCmd) | |
if err != nil { | |
log.Fatalf("Error preparing while adding data points. error = %v", err) | |
} | |
processMrangeReply(mrangeReply) | |
} | |
} | |
func vanillaClientExample() { | |
var err error = nil | |
key1 := "temperature:{area_32}:1" | |
key2 := "temperature:{area_32}:2" | |
var vanillaClient *radix.Pool | |
poolSize := 1 | |
poolOptions := radix.PoolPipelineWindow(poolPipelineWindow, poolPipelineConcurrency) | |
vanillaClient, err = radix.NewPool("tcp", host, poolSize, poolOptions) | |
if err != nil { | |
log.Fatalf("Error preparing for example, while creating new connection. error = %v", err) | |
} | |
// ensure keys do not exist to avoid Error replies | |
err = vanillaClient.Do(radix.FlatCmd(nil, "DEL", key1, key2)) | |
if err != nil { | |
log.Fatalf("Error preparing for example, while issuing DEL. error = %v", err) | |
} | |
// Create the new time-series with TS.CREATE | |
// Further reference here: https://oss.redislabs.com/redistimeseries/commands/#tscreate | |
createCmd1 := radix.FlatCmd(nil, "TS.CREATE", key1, "LABELS", "sensor_id", 1, "area_id", 32) | |
createCmd2 := radix.FlatCmd(nil, "TS.CREATE", key2, "LABELS", "sensor_id", 2, "area_id", 32) | |
err = vanillaClient.Do(createCmd1) | |
if err != nil { | |
log.Fatalf("Error preparing for example, while creating new time series. error = %v", err) | |
} | |
err = vanillaClient.Do(createCmd2) | |
if err != nil { | |
log.Fatalf("Error preparing for example, while creating new time series. error = %v", err) | |
} | |
// Populate the time-series with TS.MADD | |
datapointTimestamps := []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} | |
datapointValues := []float64{2, 4, 6, 8, 10, 12, 14, 16, 18, 20} | |
for idx, timestamp := range datapointTimestamps { | |
value := datapointValues[idx] | |
var rcv []interface{} | |
// Append new samples to a list of series. | |
// https://oss.redislabs.com/redistimeseries/commands/#tsmadd | |
maddCmd := radix.FlatCmd(&rcv, "TS.MADD", key1, timestamp, value, key2, timestamp, value) | |
err = vanillaClient.Do(maddCmd) | |
if err != nil { | |
log.Fatalf("Error preparing while adding data points. error = %v", err) | |
} | |
fmt.Println(rcv) | |
//Output: | |
// <timestamp> | |
// <timestamp> | |
} | |
// Query a timestamp range across multiple time-series by filters. | |
// Further reference here: https://oss.redislabs.com/redistimeseries/commands/#tsmrange | |
var mrangeReply []interface{} | |
mrangeCmd := radix.Cmd(&mrangeReply, "TS.MRANGE", "-", "+", "COUNT", "2", "WITHLABELS", "FILTER", "area_id=32") | |
err = vanillaClient.Do(mrangeCmd) | |
if err != nil { | |
log.Fatalf("Error preparing while adding data points. error = %v", err) | |
} | |
processMrangeReply(mrangeReply) | |
} | |
// Processes the TS.MRANGE formated reply | |
// Further refereces here: | |
func processMrangeReply(mrangeReply []interface{}) { | |
//fmt.Println(reflect.TypeOf(mrangeReply)) | |
//fmt.Println(mrangeReply) | |
for _, innerArray := range mrangeReply { | |
//retrive the inner array | |
tsArrReply, ok := innerArray.([]interface{}) | |
if ok { | |
//retrieve the ts name | |
tsName := string(tsArrReply[0].([]byte)) | |
//retrieve the ts labels | |
labelsRaw := tsArrReply[1].([]interface{}) | |
labels := make(map[string]string, len(labelsRaw)) | |
for _, labelRaw := range labelsRaw { | |
kvPair := labelRaw.([]interface{}) | |
k := string(kvPair[0].([]byte)) | |
v := string(kvPair[1].([]byte)) | |
labels[k] = v | |
} | |
//retrieve the ts values | |
valuesRaw := tsArrReply[2].([]interface{}) | |
values := make(map[int64]float64, len(valuesRaw)) | |
for _, valueRaw := range valuesRaw { | |
kvPair := valueRaw.([]interface{}) | |
k := kvPair[0].(int64) | |
v, _ := strconv.ParseFloat(kvPair[1].(string), 64) | |
values[k] = v | |
} | |
fmt.Println(tsName) | |
fmt.Println(labels) | |
fmt.Println(values) | |
//Output: <name> | |
// map[labels]value | |
// map[ts]value | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment