Skip to content

Commit 9601a6a

Browse files
committed
core: add Indexer interface and index server logic
1 parent 40072af commit 9601a6a

File tree

5 files changed

+478
-1
lines changed

5 files changed

+478
-1
lines changed

core/blockchain.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ type BlockChain struct {
303303
blockProcCounter int32
304304
scope event.SubscriptionScope
305305
genesisBlock *types.Block
306+
indexServers indexServers
306307

307308
// This mutex synchronizes chain write operations.
308309
// Readers don't need to take it, they can just read the database.
@@ -523,9 +524,15 @@ func NewBlockChain(db ethdb.Database, genesis *Genesis, engine consensus.Engine,
523524
if bc.cfg.TxLookupLimit >= 0 {
524525
bc.txIndexer = newTxIndexer(uint64(bc.cfg.TxLookupLimit), bc)
525526
}
527+
bc.indexServers.init(bc)
526528
return bc, nil
527529
}
528530

531+
// RegisterIndexer registers a new indexer to the chain.
532+
func (bc *BlockChain) RegisterIndexer(indexer Indexer) {
533+
bc.indexServers.register(indexer)
534+
}
535+
529536
func (bc *BlockChain) setupSnapshot() {
530537
// Short circuit if the chain is established with path scheme, as the
531538
// state snapshot has been integrated into path database natively.
@@ -638,6 +645,7 @@ func (bc *BlockChain) loadLastState() error {
638645
if head := rawdb.ReadFinalizedBlockHash(bc.db); head != (common.Hash{}) {
639646
if block := bc.GetBlockByHash(head); block != nil {
640647
bc.currentFinalBlock.Store(block.Header())
648+
bc.indexServers.setFinalBlock(block.NumberU64())
641649
headFinalizedBlockGauge.Update(int64(block.NumberU64()))
642650
bc.currentSafeBlock.Store(block.Header())
643651
headSafeBlockGauge.Update(int64(block.NumberU64()))
@@ -685,6 +693,7 @@ func (bc *BlockChain) initializeHistoryPruning(latest uint64) error {
685693
return errors.New("unexpected database tail")
686694
}
687695
bc.historyPrunePoint.Store(predefinedPoint)
696+
bc.indexServers.setHistoryCutoff(predefinedPoint.BlockNumber)
688697
return nil
689698

690699
case history.KeepPostMerge:
@@ -706,6 +715,7 @@ func (bc *BlockChain) initializeHistoryPruning(latest uint64) error {
706715
return errors.New("unexpected database tail")
707716
}
708717
bc.historyPrunePoint.Store(predefinedPoint)
718+
bc.indexServers.setHistoryCutoff(predefinedPoint.BlockNumber)
709719
return nil
710720

711721
default:
@@ -768,9 +778,11 @@ func (bc *BlockChain) SetFinalized(header *types.Header) {
768778
if header != nil {
769779
rawdb.WriteFinalizedBlockHash(bc.db, header.Hash())
770780
headFinalizedBlockGauge.Update(int64(header.Number.Uint64()))
781+
bc.indexServers.setFinalBlock(header.Number.Uint64())
771782
} else {
772783
rawdb.WriteFinalizedBlockHash(bc.db, common.Hash{})
773784
headFinalizedBlockGauge.Update(0)
785+
bc.indexServers.setFinalBlock(0)
774786
}
775787
}
776788

@@ -1133,6 +1145,7 @@ func (bc *BlockChain) Reset() error {
11331145
// ResetWithGenesisBlock purges the entire blockchain, restoring it to the
11341146
// specified genesis state.
11351147
func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
1148+
bc.indexServers.revert(genesis.Header())
11361149
// Dump the entire block chain and purge the caches
11371150
if err := bc.SetHead(0); err != nil {
11381151
return err
@@ -1149,6 +1162,7 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
11491162
log.Crit("Failed to write genesis block", "err", err)
11501163
}
11511164
bc.writeHeadBlock(genesis)
1165+
bc.indexServers.broadcast(genesis.Header(), true)
11521166

11531167
// Last update all in-memory chain markers
11541168
bc.genesisBlock = genesis
@@ -1261,6 +1275,7 @@ func (bc *BlockChain) stopWithoutSaving() {
12611275
// Stop stops the blockchain service. If any imports are currently in progress
12621276
// it will abort them using the procInterrupt.
12631277
func (bc *BlockChain) Stop() {
1278+
bc.indexServers.stop()
12641279
bc.stopWithoutSaving()
12651280

12661281
// Ensure that the entirety of the state snapshot is journaled to disk.
@@ -1562,6 +1577,7 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
15621577
}
15631578
}
15641579
bc.writeHeadBlock(block)
1580+
bc.indexServers.broadcast(block.Header(), true)
15651581
return nil
15661582
}
15671583

@@ -1577,7 +1593,10 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
15771593
// should be written atomically. BlockBatch is used for containing all components.
15781594
blockBatch := bc.db.NewBatch()
15791595
rawdb.WriteBlock(blockBatch, block)
1580-
rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts)
1596+
blockHash := block.Hash()
1597+
bc.blockCache.Add(blockHash, block)
1598+
rawdb.WriteReceipts(blockBatch, blockHash, block.NumberU64(), receipts)
1599+
bc.receiptsCache.Add(blockHash, receipts)
15811600
rawdb.WritePreimages(blockBatch, statedb.Preimages())
15821601
if err := blockBatch.Write(); err != nil {
15831602
log.Crit("Failed to write block into disk", "err", err)
@@ -1664,6 +1683,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
16641683

16651684
// Set new head.
16661685
bc.writeHeadBlock(block)
1686+
bc.indexServers.broadcast(block.Header(), true)
16671687

16681688
bc.chainFeed.Send(ChainEvent{Header: block.Header()})
16691689
if len(logs) > 0 {
@@ -2373,6 +2393,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Header) error
23732393
return errInvalidNewChain
23742394
}
23752395
}
2396+
bc.indexServers.revert(commonBlock)
23762397
// Ensure the user sees large reorgs
23772398
if len(oldChain) > 0 && len(newChain) > 0 {
23782399
logFn := log.Info
@@ -2470,6 +2491,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Header) error
24702491
}
24712492
// Update the head block
24722493
bc.writeHeadBlock(block)
2494+
bc.indexServers.broadcast(block.Header(), false)
24732495
}
24742496
if len(rebirthLogs) > 0 {
24752497
bc.logsFeed.Send(rebirthLogs)
@@ -2545,6 +2567,7 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {
25452567
}
25462568
}
25472569
bc.writeHeadBlock(head)
2570+
bc.indexServers.broadcast(head.Header(), true)
25482571

25492572
// Emit events
25502573
logs := bc.collectLogs(head, false)

core/blockchain_reader.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,14 @@ func (bc *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts {
266266
if !ok {
267267
return nil
268268
}
269+
return bc.GetReceipts(hash, number)
270+
}
271+
272+
// GetReceipts retrieves the receipts for all transactions in a given block.
273+
func (bc *BlockChain) GetReceipts(hash common.Hash, number uint64) types.Receipts {
274+
if receipts, ok := bc.receiptsCache.Get(hash); ok {
275+
return receipts
276+
}
269277
header := bc.GetHeader(hash, number)
270278
if header == nil {
271279
return nil

0 commit comments

Comments
 (0)