Last active
October 20, 2023 06:59
-
-
Save ja7ad/09be29eb26b98f1a0077443f63d7e7ad to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package core | |
import ( | |
"context" | |
"encoding/json" | |
"github.com/ethereum/go-ethereum" | |
"github.com/ethereum/go-ethereum/common" | |
"github.com/ethereum/go-ethereum/core/types" | |
"github.com/ethereum/go-ethereum/ethclient" | |
"github.com/google/uuid" | |
"strings" | |
"time" | |
) | |
const ( | |
maxRetries = 5 | |
interval = 1 * time.Second | |
) | |
type Subscriber interface { | |
Run() | |
Shutdown(shutdownSignal struct{}) | |
} | |
type Subscription struct { | |
logger logger.Logger | |
publisher amqp.Publisher | |
project *config.Project | |
subscriptions map[string]map[ethereum.Subscription]chan types.Log | |
networkClients map[string]*ethclient.Client | |
shutdownSignal chan struct{} | |
} | |
func NewSubscription(ctx context.Context, rabbitmq amqp.Initiator, logger logger.Logger, project *config.Project) (Subscriber, error) { | |
pub, err := rabbitmq.Publisher(amqp.PublisherConfig{}) | |
if err != nil { | |
return nil, err | |
} | |
sub := &Subscription{ | |
publisher: pub, | |
logger: logger, | |
project: project, | |
subscriptions: make(map[string]map[ethereum.Subscription]chan types.Log), | |
networkClients: make(map[string]*ethclient.Client), | |
shutdownSignal: make(chan struct{}), | |
} | |
for _, chain := range sub.project.Chains { | |
chainClient, err := sub.createClient(chain.Address) | |
if err != nil { | |
return nil, err | |
} | |
sub.networkClients[chain.ContractAddress] = chainClient | |
query := ethereum.FilterQuery{ | |
Addresses: []common.Address{ | |
common.HexToAddress(chain.ContractAddress), | |
}, | |
} | |
logCh := make(chan types.Log) | |
subscription, err := chainClient.SubscribeFilterLogs(ctx, query, logCh) | |
if err != nil { | |
return nil, err | |
} | |
sub.subscriptions[chain.ContractAddress] = map[ethereum.Subscription]chan types.Log{ | |
subscription: logCh, | |
} | |
} | |
return sub, nil | |
} | |
func (o *Subscription) Shutdown(signal struct{}) { | |
o.shutdownSignal <- signal | |
for _, subs := range o.subscriptions { | |
for sub, _ := range subs { | |
sub.Unsubscribe() | |
} | |
} | |
for _, client := range o.networkClients { | |
client.Close() | |
} | |
} | |
func (o *Subscription) Run() { | |
unhealthyContractCh := make(chan string) | |
defer close(unhealthyContractCh) | |
for contract, sub := range o.subscriptions { | |
for subscription, logs := range sub { | |
go stream(contract, o.project.Service, o.project.GetChainByContractAddress(contract), o.logger, o.publisher, subscription, logs, unhealthyContractCh, o.shutdownSignal) | |
o.logger.Info(false, "stream has been started", "service", o.project.Service, "contract", contract) | |
} | |
} | |
for unhealthyContract := range unhealthyContractCh { | |
go func(contract string) { | |
for { | |
sub, logCh, err := o.recovery(contract) | |
if err != nil { | |
o.logger.Error(true, "failed to recovery", | |
"service", o.project.Service, | |
"contract", contract, | |
"err", err) | |
time.Sleep(2 * time.Second) | |
continue | |
} | |
go func() { | |
stream(contract, o.project.Service, o.project.GetChainByContractAddress(contract), o.logger, o.publisher, sub, logCh, unhealthyContractCh, o.shutdownSignal) | |
o.logger.Info(false, "recovery subscription has been completed", "contract", contract) | |
}() | |
break | |
} | |
}(unhealthyContract) | |
} | |
} | |
func (o *Subscription) createClient(address string) (*ethclient.Client, error) { | |
return ethclient.Dial(address) | |
} | |
func stream(contractAddress string, serviceName string, chain *config.Chain, logger logger.Logger, pub amqp.Publisher, | |
subscription ethereum.Subscription, logCh chan types.Log, unhealthyContractCh chan<- string, shutdownSignal <-chan struct{}) { | |
for { | |
select { | |
case <-shutdownSignal: | |
return | |
case err := <-subscription.Err(): | |
logger.Warn(false, "subscription got error, trying recovery subscription...", | |
"service", serviceName, | |
"network", chain.Network, | |
"err", err.Error(), | |
"contract", contractAddress) | |
close(logCh) | |
unhealthyContractCh <- contractAddress | |
return | |
case log := <-logCh: | |
if strings.ToLower(log.Address.Hex()) != strings.ToLower(chain.ContractAddress) { | |
continue | |
} | |
logger.Debug(false, "new event has been received", | |
"service", serviceName, | |
"block", log.BlockNumber, | |
"transaction", log.TxHash.Hex()) | |
b, err := json.Marshal(&goTypes.Event{ | |
ContractAddress: contractAddress, | |
TransactionHash: log.TxHash.Hex(), | |
Data: log.Data, | |
Topics: log.Topics, | |
}) | |
if err != nil { | |
logger.Error(true, "failed json marshal", | |
"service", serviceName, | |
"transaction", log.TxHash.Hex(), | |
"err", err.Error()) | |
continue | |
} | |
go func() { | |
err := pub.PublishWithRetry( | |
context.Background(), | |
config.ExchangeName, | |
serviceName+config.QueuePostfix, | |
false, | |
false, | |
amqp.Publishing{ | |
DeliveryMode: amqp.Persistent, | |
MessageId: uuid.New().String(), | |
Body: b, | |
}, | |
) | |
if err != nil { | |
logger.Error(true, err.Error()) | |
} | |
}() | |
} | |
} | |
} | |
func (o *Subscription) recovery(contract string) (ethereum.Subscription, chan types.Log, error) { | |
o.logger.Warn(false, "trying to recovery subscription...", "contract", contract) | |
delete(o.networkClients, contract) | |
delete(o.subscriptions, contract) | |
chainAddress := "" | |
for _, chain := range o.project.Chains { | |
if chain.ContractAddress == contract { | |
chainAddress = chain.Address | |
} | |
} | |
chainClient, err := o.createClient(chainAddress) | |
if err != nil { | |
return nil, nil, err | |
} | |
query := ethereum.FilterQuery{ | |
Addresses: []common.Address{ | |
common.HexToAddress(contract), | |
}, | |
} | |
logCh := make(chan types.Log) | |
subscription, err := chainClient.SubscribeFilterLogs(context.Background(), query, logCh) | |
if err != nil { | |
return nil, nil, err | |
} | |
o.networkClients[contract] = chainClient | |
o.subscriptions[contract] = map[ethereum.Subscription]chan types.Log{ | |
subscription: logCh, | |
} | |
return subscription, logCh, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment