以太坊交易池分析

简介

以太坊交易池有以下功能:

  1. 缓存交易
  2. 清理交易
  3. 实现交易gasPrice竞价功能
  4. 配合出块,提供打包交易
  5. 交易查询

配置

配置描述

geth中用数据结构TxPoolConfig描述交易池配置,具体如下:

// TxPoolConfig are the configuration parameters of the transaction pool.
type TxPoolConfig struct {
    NoLocals  bool          // Whether local transaction handling should be disabled
    Journal   string        // Journal of local transactions to survive node restarts
    Rejournal time.Duration // Time interval to regenerate the local transaction journal

    PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool
    PriceBump  uint64 // Minimum price bump percentage to replace an already existing transaction (nonce)

    AccountSlots uint64 // Minimum number of executable transaction slots guaranteed per account
    GlobalSlots  uint64 // Maximum number of executable transaction slots for all accounts
    AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
    GlobalQueue  uint64 // Maximum number of non-executable transaction slots for all accounts

    Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
}

对其中相关参数说明如下:

AccountSlots:每个帐户等待处理交易的最大个数。默认值为16 GlobalSlots:所有帐户等待处理交易的最大个数。默认值为4096 AccountQueue:每帐户暂不能处理交易的最大个数。默认值为64 GlobalQueue:所有帐户暂不能处理交易的最大个数。默认值为1024 Lifetime:暂不能处理交易在队列中最大保存时长。默认值为3小时

问题来了:等待处理交易与暂不能处理交易有什么不一样?

等待处理交易满足交易执行条件,暂不能处理交易不满足交易执行条件。这里的交易执行条件是指交易的nonce不能超过当前区块中的交易的nonce的值加1。

默认配置

默认配置信息保存在DefaultTxPoolConfig变量中。

// DefaultTxPoolConfig contains the default configurations for the transaction
// pool.
var DefaultTxPoolConfig = TxPoolConfig{
    Journal:   "transactions.rlp",
    Rejournal: time.Hour,

    PriceLimit: 1,
    PriceBump:  10,

    AccountSlots: 16,
    GlobalSlots:  4096,
    AccountQueue: 64,
    GlobalQueue:  1024,

    Lifetime: 3 * time.Hour,
}

交易池

描述

type TxPool struct {
    config       TxPoolConfig
    chainconfig  *params.ChainConfig
    chain        blockChain
    gasPrice     *big.Int
    txFeed       event.Feed
    scope        event.SubscriptionScope
    chainHeadCh  chan ChainHeadEvent
    chainHeadSub event.Subscription
    signer       types.Signer
    mu           sync.RWMutex 

    currentState  *state.StateDB      // Current state in the blockchain head
    pendingState  *state.ManagedState // Pending state tracking virtual nonces
    currentMaxGas uint64              // Current gas limit for transaction caps

    locals  *accountSet // Set of local transaction to exempt from eviction rules
    journal *txJournal  // Journal of local transaction to back up to disk

    pending map[common.Address]*txList   // All currently processable transactions
    queue   map[common.Address]*txList   // Queued but non-processable transactions
    beats   map[common.Address]time.Time // Last heartbeat from each known account
    all     *txLookup                    // All transactions to allow lookups
    priced  *txPricedList                // All transactions sorted by price

    wg sync.WaitGroup // for shutdown sync

    homestead bool 
}

关注一下以下几个成员就知道tx是如何保存与维护的:

  1. pending map[common.Address]*txList
  2. queue map[common.Address]*txList
  3. beats map[common.Address]time.Time
  4. all *txLookup // map类型,txhash为key,tx为value
  5. priced *txPricedList // heap维护gasPrice排序

存储

交易池不需要持久化,数据直接保存在内存中

流程

如果将交易池当作队列来看,就有以下操作:

  1. 添加
  2. 删除
  3. 更新

添加

事件源

  1. 本地用户添加交易
  2. 远端RPC调加交易

检查条件

添加动作完成很简单,重要是添加前条件检查。相关检查代码如下:

// add validates a transaction and inserts it into the non-executable queue for
// later pending promotion and execution. If the transaction is a replacement for
// an already pending or queued one, it overwrites the previous and returns this
// so outer code doesn't uselessly call promote.
//
// If a newly added transaction is marked as local, its sending account will be
// whitelisted, preventing any associated transaction from being dropped out of
// the pool due to pricing constraints.
func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
    // If the transaction is already known, discard it
    hash := tx.Hash()
    if pool.all.Get(hash) != nil {
        log.Trace("Discarding already known transaction", "hash", hash)
        return false, fmt.Errorf("known transaction: %x", hash)
    }
    // If the transaction fails basic validation, discard it
    if err := pool.validateTx(tx, local); err != nil {
        log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
        invalidTxCounter.Inc(1)
        return false, err
    }
    // If the transaction pool is full, discard underpriced transactions
    if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
        // If the new transaction is underpriced, don't accept it
        if !local && pool.priced.Underpriced(tx, pool.locals) {
            log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
            underpricedTxCounter.Inc(1)
            return false, ErrUnderpriced
        }
        // New transaction is better than our worse ones, make room for it
        drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
        for _, tx := range drop {
            log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
            underpricedTxCounter.Inc(1)
            pool.removeTx(tx.Hash(), false)
        }
    }
    // If the transaction is replacing an already pending one, do directly
    from, _ := types.Sender(pool.signer, tx) // already validated
    if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
        // Nonce already pending, check if required price bump is met
        inserted, old := list.Add(tx, pool.config.PriceBump)
        if !inserted {
            pendingDiscardCounter.Inc(1)
            return false, ErrReplaceUnderpriced
        }
        // New transaction is better, replace old one
        if old != nil {
            pool.all.Remove(old.Hash())
            pool.priced.Removed()
            pendingReplaceCounter.Inc(1)
        }
        pool.all.Add(tx)
        pool.priced.Put(tx)
        pool.journalTx(from, tx)

        log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())

        // We've directly injected a replacement transaction, notify subsystems
        go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}})

        return old != nil, nil
    }
    // New transaction isn't replacing a pending one, push into queue
    replace, err := pool.enqueueTx(hash, tx)
    if err != nil {
        return false, err
    }
    // Mark local addresses and journal local transactions
    if local {
        pool.locals.add(from)
    }
    pool.journalTx(from, tx)

    log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
    return replace, nil
}

从上面代码可以看出,检查项还是很多,罗列如下:

  1. 重复性检查: 检查是否已经存在队列中
  2. 交易大小检查,不能超过32KB
  3. 转帐数额不能是负数
  4. gasPrice不超过当前最高限定gasPrice
  5. 交易签名检查
  6. gasPrice竞价检查
  7. Nonce检查
  8. 帐户ETH检查,防止余额不足
  9. gasLimit检查,防止gas费用不足

删除

事件源

删除交易有以下情况:

  1. 交易队列容量超出
  2. 超时删除
  3. 接收新块检查删除非法交易

更新

事件源

  1. 相同nonce更新交易
  2. 定时器检查将满足执行条件的queue队列上交易添加到pending队列

参考

  1. 以太坊如何清除已发出未打包的交易

欢迎关注

欢迎关注微信公众帐号:沉风网事(savewind)

沉风网事

Share Comments
comments powered by Disqus