Created
November 23, 2023 14:07
-
-
Save 0xphilipp/0097f468d34ec33801eafe108499ad75 to your computer and use it in GitHub Desktop.
sudo_callback
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use cosmwasm_std::{ | |
attr, Attribute, Binary, CosmosMsg, DepsMut, Env, Response, StdError, StdResult, Uint128, | |
}; | |
use eris::hub::{CallbackMsg, ExecuteMsg, IcaConfig, VirtualDelegation, VirtualDelegations}; | |
use eris::querier::query_balance; | |
use neutron_sdk::bindings::msg::NeutronMsg; | |
use neutron_sdk::bindings::query::NeutronQuery; | |
use crate::error::{ContractError, CustomResponse}; | |
use crate::state::State; | |
use crate::states::actionstate::{ | |
add_error_to_queue, read_action_by_sequence, AcknowledgementResult, SudoAction, | |
ACKNOWLEDGEMENT_RESULTS, | |
}; | |
use crate::states::icastate::{assert_ica_initialized, IcaConfigExt}; | |
use neutron_sdk::sudo::msg::RequestPacket; | |
pub fn sudo_response( | |
mut deps: DepsMut<NeutronQuery>, | |
env: Env, | |
request: RequestPacket, | |
_data: Binary, | |
) -> CustomResponse { | |
// deps.api.debug( | |
// format!("WASMDEBUG: sudo_response: sudo received: {:?} {:?}", request, data).as_str(), | |
// ); | |
// WARNING: RETURNING THIS ERROR CLOSES THE CHANNEL. | |
// AN ALTERNATIVE IS TO MAINTAIN AN ERRORS QUEUE AND PUT THE FAILED REQUEST THERE | |
// FOR LATER INSPECTION. | |
// In this particular case, we return an error because not having the sequence id | |
// in the request value implies that a fatal error occurred on Neutron side. | |
let seq_id = request.sequence.ok_or_else(|| StdError::generic_err("sequence"))?; | |
// WARNING: RETURNING THIS ERROR CLOSES THE CHANNEL. | |
// AN ALTERNATIVE IS TO MAINTAIN AN ERRORS QUEUE AND PUT THE FAILED REQUEST THERE | |
// FOR LATER INSPECTION. | |
// In this particular case, we return an error because not having the sequence id | |
// in the request value implies that a fatal error occurred on Neutron side. | |
let channel_id = request.source_channel.ok_or_else(|| StdError::generic_err("channel_id"))?; | |
// NOTE: NO ERROR IS RETURNED HERE. THE CHANNEL LIVES ON. | |
// In this particular example, this is a matter of developer's choice. Not being able to read | |
// the payload here means that there was a problem with the contract while submitting an | |
// interchain transaction. You can decide that this is not worth killing the channel, | |
// write an error log and / or save the acknowledgement to an errors queue for later manual | |
// processing. The decision is based purely on your application logic. | |
let payload = read_action_by_sequence(deps.storage, channel_id.clone(), seq_id).ok(); | |
let mut attrs = vec![]; | |
if let Some(payload) = payload { | |
// deps.api.debug(format!("WASMDEBUG: sudo_response: sudo payload: {:?}", payload).as_str()); | |
// WARNING: RETURNING THIS ERROR CLOSES THE CHANNEL. | |
// AN ALTERNATIVE IS TO MAINTAIN AN ERRORS QUEUE AND PUT THE FAILED REQUEST THERE | |
// FOR LATER INSPECTION. | |
// In this particular case, we return an error because not being able to parse this data | |
// that a fatal error occurred on Neutron side, or that the remote chain sent us unexpected data. | |
// Both cases require immediate attention. | |
// let _parsed_data = decode_acknowledgement_response(data)?; | |
let action: String = match payload.action.clone() { | |
SudoAction::GovVoted { | |
proposal_id, | |
} => { | |
super::gov::exec_vote_done(&mut deps, proposal_id)?; | |
"exec_vote_done".into() | |
}, | |
SudoAction::Empty { | |
source, | |
} => { | |
// nothing to do. | |
format!("noop-{0}", source) | |
}, | |
SudoAction::HarvestRewardsToHub { | |
amount, | |
} => { | |
super::harvest::exec_send_rewards_to_hub_done(&mut deps, &env, amount)?; | |
"exec_send_rewards_to_hub_done".into() | |
}, | |
SudoAction::HarvestSendDepositsToHub { | |
amount, | |
} => { | |
super::harvest::exec_send_deposits_to_hub_done(&mut deps, amount)?; | |
"exec_send_deposits_to_hub_done".into() | |
}, | |
SudoAction::DelegationsStake { | |
total, | |
delegations, | |
} => { | |
super::delegations::exec_stake_on_host_done(&mut deps, total, delegations)?; | |
"exec_stake_on_host_done".into() | |
}, | |
SudoAction::DelegationsRebalance { | |
redelegations, | |
} => { | |
super::delegations::exec_rebalance_done(&mut deps, redelegations)?; | |
"exec_rebalance_done".into() | |
}, | |
SudoAction::DelegationsSubmitBatch { | |
undelegations, | |
utoken_to_unbond, | |
ustake_burnt, | |
batch_id, | |
} => { | |
super::delegations::exec_submit_batch_done( | |
&mut deps, | |
&env, | |
batch_id, | |
utoken_to_unbond, | |
ustake_burnt, | |
undelegations, | |
)?; | |
"exec_submit_batch_done".into() | |
}, | |
SudoAction::BondingWithdrawUnbonded { | |
user, | |
} => { | |
super::bonding::withdraw_unbonded_done(&mut deps, &env, user)?; | |
"withdraw_unbonded_done".into() | |
}, | |
}; | |
attrs.push(attr("type", action)); | |
// update but also check that we don't update same seq_id twice | |
ACKNOWLEDGEMENT_RESULTS.update( | |
deps.storage, | |
(payload.port_id.clone(), seq_id), | |
|maybe_ack| -> StdResult<AcknowledgementResult> { | |
match maybe_ack { | |
Some(_ack) => Err(StdError::generic_err("update same seq_id")), | |
None => Ok(AcknowledgementResult::Success( | |
serde_json_wasm::to_string(&payload.action).unwrap_or_default(), | |
)), | |
} | |
}, | |
)?; | |
} else { | |
let error_msg = "Unable to read sudo payload"; | |
// deps.api.debug(error_msg); | |
add_error_to_queue(deps.storage, error_msg.to_string()); | |
return Ok(Response::default() | |
.add_attribute("action", "erishub/sudo_response_failed") | |
.add_attribute("error", error_msg)); | |
} | |
Ok(Response::default() | |
.add_attribute("action", "erishub/sudo_response") | |
.add_attributes(attrs) | |
.add_attribute("source", format!("{0}-{1}", channel_id, seq_id))) | |
} | |
pub fn sudo_timeout( | |
mut deps: DepsMut<NeutronQuery>, | |
env: Env, | |
request: RequestPacket, | |
) -> CustomResponse { | |
// deps.api.debug(format!("WASMDEBUG: sudo timeout request: {:?}", request).as_str()); | |
// WARNING: RETURNING THIS ERROR CLOSES THE CHANNEL. | |
// AN ALTERNATIVE IS TO MAINTAIN AN ERRORS QUEUE AND PUT THE FAILED REQUEST THERE | |
// FOR LATER INSPECTION. | |
// In this particular case, we return an error because not having the sequence id | |
// in the request value implies that a fatal error occurred on Neutron side. | |
let seq_id = request.sequence.ok_or_else(|| StdError::generic_err("sequence"))?; | |
// WARNING: RETURNING THIS ERROR CLOSES THE CHANNEL. | |
// AN ALTERNATIVE IS TO MAINTAIN AN ERRORS QUEUE AND PUT THE FAILED REQUEST THERE | |
// FOR LATER INSPECTION. | |
// In this particular case, we return an error because not having the sequence id | |
// in the request value implies that a fatal error occurred on Neutron side. | |
let channel_id = request.source_channel.ok_or_else(|| StdError::generic_err("channel_id"))?; | |
// update but also check that we don't update same seq_id twice | |
// NOTE: NO ERROR IS RETURNED HERE. THE CHANNEL LIVES ON. | |
// In this particular example, this is a matter of developer's choice. Not being able to read | |
// the payload here means that there was a problem with the contract while submitting an | |
// interchain transaction. You can decide that this is not worth killing the channel, | |
// write an error log and / or save the acknowledgement to an errors queue for later manual | |
// processing. The decision is based purely on your application logic. | |
// Please be careful because it may lead to an unexpected state changes because state might | |
// has been changed before this call and will not be reverted because of supressed error. | |
let payload = read_action_by_sequence(deps.storage, channel_id.clone(), seq_id).ok(); | |
if let Some(payload) = payload { | |
let _result = failed(&payload, &mut deps, &env); | |
// update but also check that we don't update same seq_id twice | |
ACKNOWLEDGEMENT_RESULTS.update( | |
deps.storage, | |
(payload.port_id.clone(), seq_id), | |
|maybe_ack| -> StdResult<AcknowledgementResult> { | |
match maybe_ack { | |
Some(_ack) => Err(StdError::generic_err("update same seq_id")), | |
None => Ok(AcknowledgementResult::Timeout( | |
serde_json_wasm::to_string(&payload.action).unwrap_or_default(), | |
)), | |
} | |
}, | |
)?; | |
} else { | |
let error_msg = "Unable to read sudo payload"; | |
// deps.api.debug(error_msg); | |
add_error_to_queue(deps.storage, error_msg.to_string()); | |
} | |
Ok(Response::default() | |
.add_attribute("action", "erishub/sudo_timeout") | |
.add_attribute("source", format!("{0}-{1}", channel_id, seq_id))) | |
} | |
pub fn sudo_error( | |
mut deps: DepsMut<NeutronQuery>, | |
env: Env, | |
request: RequestPacket, | |
details: String, | |
) -> CustomResponse { | |
// deps.api.debug(format!("WASMDEBUG: sudo error: {}", details).as_str()); | |
// deps.api.debug(format!("WASMDEBUG: request packet: {:?}", request).as_str()); | |
// WARNING: RETURNING THIS ERROR CLOSES THE CHANNEL. | |
// AN ALTERNATIVE IS TO MAINTAIN AN ERRORS QUEUE AND PUT THE FAILED REQUEST THERE | |
// FOR LATER INSPECTION. | |
// In this particular case, we return an error because not having the sequence id | |
// in the request value implies that a fatal error occurred on Neutron side. | |
let seq_id = request.sequence.ok_or_else(|| StdError::generic_err("sequence"))?; | |
// WARNING: RETURNING THIS ERROR CLOSES THE CHANNEL. | |
// AN ALTERNATIVE IS TO MAINTAIN AN ERRORS QUEUE AND PUT THE FAILED REQUEST THERE | |
// FOR LATER INSPECTION. | |
// In this particular case, we return an error because not having the sequence id | |
// in the request value implies that a fatal error occurred on Neutron side. | |
let channel_id = request.source_channel.ok_or_else(|| StdError::generic_err("channel_id"))?; | |
let payload = read_action_by_sequence(deps.storage, channel_id.clone(), seq_id).ok(); | |
if let Some(payload) = payload { | |
let _result = failed(&payload, &mut deps, &env); | |
// update but also check that we don't update same seq_id twice | |
ACKNOWLEDGEMENT_RESULTS.update( | |
deps.storage, | |
(payload.port_id.clone(), seq_id), | |
|maybe_ack| -> StdResult<AcknowledgementResult> { | |
match maybe_ack { | |
Some(_ack) => Err(StdError::generic_err("update same seq_id")), | |
None => Ok(AcknowledgementResult::Error(( | |
serde_json_wasm::to_string(&payload.action).unwrap_or_default(), | |
details.clone(), | |
))), | |
} | |
}, | |
)?; | |
} else { | |
let error_msg = "Unable to read sudo payload"; | |
// deps.api.debug(error_msg); | |
add_error_to_queue(deps.storage, error_msg.to_string()); | |
} | |
Ok(Response::default() | |
.add_attribute("action", "erishub/sudo_error") | |
.add_attribute("source", format!("{0}-{1}", channel_id, seq_id)) | |
.add_attribute("error", details)) | |
} | |
fn failed( | |
payload: &crate::states::actionstate::SudoPayload, | |
deps: &mut DepsMut<NeutronQuery>, | |
env: &Env, | |
) -> Result<(), crate::error::ContractError> { | |
match payload.action.clone() { | |
SudoAction::DelegationsStake { | |
total, | |
delegations, | |
} => super::delegations::exec_stake_on_host_fail(deps, total, delegations), | |
SudoAction::DelegationsSubmitBatch { | |
undelegations, | |
utoken_to_unbond, | |
ustake_burnt, | |
batch_id, | |
} => super::delegations::exec_submit_batch_fail( | |
deps, | |
env, | |
batch_id, | |
utoken_to_unbond, | |
ustake_burnt, | |
undelegations, | |
), | |
SudoAction::DelegationsRebalance { | |
redelegations, | |
} => super::delegations::exec_rebalance_fail(deps, redelegations), | |
SudoAction::BondingWithdrawUnbonded { | |
user, | |
} => super::bonding::withdraw_unbonded_fail(deps, env, user), | |
_ => { | |
// nothing to do. | |
Ok(()) | |
}, | |
} | |
} | |
pub(crate) fn sudo_query_result( | |
deps: DepsMut<NeutronQuery>, | |
env: Env, | |
query_id: u64, | |
) -> CustomResponse { | |
let state = State::default(); | |
let stake = state.stake_token.load(deps.storage)?; | |
let ica_config = assert_ica_initialized(deps.storage)?; | |
if Some(query_id) == ica_config.icq_balance_id { | |
Ok(icq_balance(ica_config, deps, env, stake, state)?) | |
} else if Some(query_id) == ica_config.icq_delegations_id { | |
Ok(icq_delegations(ica_config, deps, env, stake, state)?) | |
} else { | |
Err(ContractError::UnknownQueryId {}) | |
} | |
} | |
fn icq_balance( | |
ica_config: IcaConfig, | |
mut deps: DepsMut<'_, NeutronQuery>, | |
env: Env, | |
mut stake: eris::hub::StakeToken, | |
state: State<'_>, | |
) -> CustomResponse { | |
// will be called when the balance ICQ has updated | |
let balances = ica_config.query_ica_balances(deps.as_ref())?; | |
let mut msgs: Vec<CosmosMsg<NeutronMsg>> = vec![]; | |
let mut attrs: Vec<Attribute> = vec![]; | |
// Sending fee to collector | |
let fee = ica_config.get_fee_ica(deps.storage, &env)?; | |
let fee_balance = balances.accounts.get(&fee.get_address()?); | |
if let Some(fee_balance) = fee_balance { | |
let fee_amount = fee_balance.coins.find(&stake.utoken_host); | |
if fee_amount.amount > ica_config.min_fee_withdrawal.unwrap_or_default() { | |
attrs.push(attr("send_fee_to_collector", fee_amount.amount)); | |
msgs.push( | |
CallbackMsg::FeesToCollector { | |
amount: fee_amount.amount, | |
} | |
.to_cosmos(&env)?, | |
); | |
} | |
} | |
// Sending rewards to hub | |
let reward = ica_config.get_reward_ica(deps.storage, &env)?; | |
let reward_balance = balances.accounts.get(&reward.get_address()?); | |
if let Some(reward_balance) = reward_balance { | |
let reward_amount = reward_balance.coins.find(&stake.utoken_host); | |
if reward_amount.amount > ica_config.min_reward_restake.unwrap_or_default() { | |
attrs.push(attr("send_rewards_to_hub", reward_amount.amount)); | |
msgs.push( | |
CallbackMsg::RewardsToHub { | |
amount: reward_amount.amount, | |
} | |
.to_cosmos(&env)?, | |
); | |
} | |
} | |
// stake all utoken already in the hub. | |
if !stake.utoken_to_stake.is_zero() { | |
attrs.push(attr("stake_on_host", stake.utoken_to_stake)); | |
msgs.push(CallbackMsg::StakeOnHost {}.to_cosmos(&env)?); | |
} | |
// Sending deposits to hub | |
let balance = query_balance( | |
&deps.querier, | |
env.contract.address.clone(), | |
stake.utoken_controller.clone(), | |
)?; | |
if !balance.is_zero() { | |
attrs.push(attr("send_deposits_to_hub", balance)); | |
msgs.push(CallbackMsg::SendToHost {}.to_cosmos(&env)?); | |
} | |
// Try reconciling | |
let hub = ica_config.get_hub_ica(deps.storage, &env)?; | |
let hub_balance = balances.accounts.get(&hub.get_address()?); | |
if let Some(hub_balance) = hub_balance { | |
let hub_amount = hub_balance.coins.find(&stake.utoken_host); | |
// only reconcile if no token is being moved to hub at the moment, as synchronousity is not guaranteed. | |
// so only execute if sure that the balance either includes the stake deposits, or there are no deposits. | |
if !hub_amount.amount.is_zero() && stake.utoken_transit_to_hub.is_zero() { | |
super::reconcile::try_reconcile( | |
&state, | |
&mut stake, | |
deps.branch(), | |
&env, | |
&mut attrs, | |
hub_amount.amount, | |
)?; | |
} | |
} | |
// Submit the current batch | |
let pending_batch = state.pending_batch.load(deps.storage)?; | |
if let Some(est_unbond_start_time) = pending_batch.est_unbond_start_time { | |
let current_time = env.block.time.seconds(); | |
if current_time >= est_unbond_start_time && !pending_batch.ustake_to_burn.is_zero() { | |
attrs.push(attr("submit_batch", balance)); | |
msgs.push(ExecuteMsg::SubmitBatch {}.to_cosmos(&env)?) | |
} | |
} | |
// no changes | |
if attrs.is_empty() { | |
let old_balances = state.balances.load(deps.storage).unwrap_or_default(); | |
if old_balances == balances { | |
// if we dont have changes, we dont allow the ICQ to succeed, | |
// this allows us to keep the TX log for our contract empty, which is useful for short update periods. | |
return Err(ContractError::IcqBalanceNotChanged {}); | |
} | |
attrs.push(attr("balance_changed", "noop")); | |
} | |
state.balances.save(deps.storage, &balances)?; | |
Ok(Response::default() | |
.add_messages(msgs) | |
.add_attribute("action", "erishub/icq_balance") | |
.add_attributes(attrs)) | |
} | |
fn icq_delegations( | |
ica_config: IcaConfig, | |
deps: DepsMut<'_, NeutronQuery>, | |
_env: Env, | |
mut stake: eris::hub::StakeToken, | |
state: State<'_>, | |
) -> CustomResponse { | |
// will be called when the delegations ICQ has updated | |
let delegations = ica_config.query_ica_delegations(deps.as_ref()).map_err(|e| { | |
StdError::generic_err(format!("error parsing delegations, no delegations yet? {0}", e)) | |
})?; | |
let mut new_delegations_vec: Vec<VirtualDelegation> = vec![]; | |
for item in delegations.delegations { | |
new_delegations_vec.push((item.validator, item.amount.amount)) | |
} | |
let new_delegations = VirtualDelegations { | |
delegations: new_delegations_vec, | |
}; | |
let new_delegations_hash = new_delegations.to_hash_map(); | |
let existing_delegations = state.delegations.load(deps.storage)?.to_hash_map(); | |
if new_delegations_hash == existing_delegations { | |
return Err(ContractError::IcqDelegationsNotChanged {}); | |
} | |
state.assert_no_delegation_change_in_progress(deps.storage)?; | |
let mut reduced = Uint128::zero(); | |
let mut increased = Uint128::zero(); | |
// checking which validators are missing from current | |
let mut attrs: Vec<Attribute> = vec![]; | |
for (validator, amount) in &existing_delegations { | |
if !new_delegations_hash.contains_key(validator) { | |
attrs.push(attr("removed", validator)); | |
reduced = reduced.checked_add(*amount)?; | |
} | |
} | |
// iterate current delegations | |
for (new_delegation, amount) in &new_delegations_hash { | |
let zero = &Uint128::zero(); | |
let existing_amount = existing_delegations.get(new_delegation).unwrap_or(zero); | |
match amount.cmp(existing_amount) { | |
std::cmp::Ordering::Less => { | |
// new snapshot has less stake as before | |
let reduced_by = existing_amount - amount; | |
reduced = reduced.checked_add(reduced_by)?; | |
attrs.push(attr("reduced", format!("{0}-{1}", new_delegation, reduced_by))); | |
}, | |
std::cmp::Ordering::Equal => { | |
// everything as expected | |
}, | |
std::cmp::Ordering::Greater => { | |
// added stake | |
let increased_by = amount - existing_amount; | |
increased = increased.checked_add(increased_by)?; | |
attrs.push(attr("increased", format!("{0}+{1}", new_delegation, increased_by))); | |
}, | |
} | |
} | |
stake.total_utoken_staked = | |
stake.total_utoken_staked.checked_add(increased)?.checked_sub(reduced)?; | |
state.stake_token.save(deps.storage, &stake)?; | |
state.delegations.save(deps.storage, &new_delegations)?; | |
Ok(Response::default() | |
.add_attribute("action", "erishub/icq_delegations") | |
.add_attribute("total_reduced", reduced) | |
.add_attribute("total_increased", increased) | |
.add_attributes(attrs)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment