以太坊索引器原理及实现

在区块链上运行复杂的查询既耗时又昂贵,为了使过程变得简单快捷,我们需要为日志或事件数据建立索引。本文将介绍以太坊索引器的工作原理并给出实现代码。

以太坊索引器原理及实现
一键发币: SOL | BNB | ETH | BASE | Blast | ARB | OP | POLYGON | AVAX | FTM | OK

有很多长期从事以太坊工作的人都知道事件是智能合约中非常重要的一部分,每当交易发生时我们都会在函数调用结束时触发一个事件,这给我们提供了一些重要的数据,这对我们有帮助 要了解交易的内部情况,我们可以使用该数据对指数期货流程进行分析或通知。

Etherscan的输出

1、索引链上数据

在开始之前我们必须了解什么是索引(indexing),它是数据库中非常有用的概念,有助于对某些字段上的数据进行排列或排序,从而使将来对数据的查询更快,从而使数据库使用更有效。

问题是为什么我们需要在区块链中建立索引,因为如果我们在区块链上运行丰富的查询,那么它既耗时又昂贵,而且为了使过程变得简单快捷,我们使用日志或事件数据并为其建立索引,以便我们可以在关闭时进行查询 -更有效地链接数据。

2、SubGraph

SubGraph 是流行的平台之一,它可以帮助项目获取事件日志数据、索引数据,并为 GraphQL 提供查询该数据的内容。

但作为一名开发人员,我们需要了解这些日志是如何工作的,以及 Subgrpah 如何了解哪个数据字段需要索引、调用哪个事件以及如何以有效格式解码数据。

3、创建自己的链上数据索引器

今天我将解释如何理解日志、解码日志、获取事件日志并为其建立索引。本文涉及的源码可以从这里获取。

为了实现这一目标,我们需要了解以下内容。

  • Nodejs:运行时
  • 数据库:存储解码的数据
  • Web3:访问以太坊的开发包
  • 合约ABI:合约的应用调用接口
  • Etherscan:以太坊区块浏览器

3.1 合约 ABI

ABI是合约中非常重要的一部分,对于这个项目来说,它可以帮助我们了解我们有多少个不同的函数或事件,合约将在事件中传递的参数是什么,以及我们必须在其上应用索引 。

在下面的屏幕截图中,你可以看到 ABI 的部分,它告诉我们有一个名为 Transfer 的事件,具有三个参数,但其中只有两个索引为 true,这意味着索引仅适用于两个字段 。

在项目中,你将在 config 文件夹下找到abi.js,将你的合约 ABI 复制到其中。

3.2 了解日志

我们如何知道从 Etherscan 上的日志中调用了哪个事件?

如果你仔细观察日志你发现一个有趣的事情,日志中的第一个哈希值实际上是事件名称及其参数数据类型的哈希值,主题索引为零,你可以自己查看。其余的其他参数是事件的参数,你可以在索引中传递最多三个参数,剩余的参数将显示在日志中的 data 字段中:

你可以通过使用其他事件名称和参数的链接来测试它,但请记住一件事,注意空格,否则哈希将不会生成:

Transfer(address,address,uint256)
Hash link: https://emn178.github.io/online-tools/keccak_256.html

3.3 从 ABI 生成签名

现在我们了解了它的工作原理,是时候查看代码以及如何将 ABI 转换为签名了。

代码如下:

var ethers = require("ethers");
var abi = require("../../config/abi"); // first we load ABI 
const fs = require("fs");
var abiSigaturePath = "./../config/event_signature.json";
var utils = ethers.utils;

/**
 * This function is used to get the ABI and convert them into its Signature
 * we used to create table and do transactions
 */
module.exports.setupABISignature = async () => {
    var allEvent = abi.contractAbi.filter(function (el) {
        // here we get only those object whose type is Event from ABI
        return el.type == "event";
    });

    var eventSingature = {};

    // {"singature":{"Name":"","fieldsName":[],"fieldsType":[],"Index":["field1","feild2"]}
    console.log("Found ", allEvent.length + " Number Of Event in ABI");
    console.log("Start Create Event Signature ....");

    allEvent.forEach((event) => {
        var tem = {};
        var isComma = false;

        var functionSignature = event.name + "(";
        tem["Name"] = event.name;
        tem["fieldsName"] = [];
        tem["fieldsType"] = [];
        tem["Index"] = [];
        event.inputs.forEach((input) => {
            // here we create event object to signature-based object
            tem["fieldsName"].push(input.name);
            tem["fieldsType"].push(input.type);
            if (isComma) {
                functionSignature += ",";
            }
            // console.log(isComma, "  functionSignature", functionSignature);
            input.indexed ? tem["Index"].push(input.name) : "";
            functionSignature += input.type;
            isComma = true;
        });
        functionSignature += ")";
        console.log("function signature", functionSignature); //
        var bytesArrary = utils.toUtf8Bytes(functionSignature);
        var signature = utils.keccak256(bytesArrary); // Transfer(address,address,unit256) -> function signature
        eventSingature[signature] = tem; // here we put all the values related to the function
    });
    fs.writeFileSync(abiSigaturePath, JSON.stringify(eventSingature)); 
    // once it is done put them into a file so we can use them
    console.log("Finish Create Event Signature .... Path", abiSigaturePath);
};

最终输出如下:

{
    "0x62e78cea01bee320cd4e420270b5ea74000d11b0c9f74754ebdbfc544b05a258": {
        "Name": "Paused",
        "fieldsName": ["account"], // list of all field 
        "fieldsType": ["address"], // there data type
        "Index": []
    },
    "0x5db9ee0a495bf2e6ff9c91a7834c1ba4fdd244a5e8aa4e537bd38aeae4b073aa": {
        "Name": "Unpaused",
        "fieldsName": ["account"],
        "fieldsType": ["address"],
        "Index": []
    },
    "0x6719d08c1888103bea251a4ed56406bd0c3e69723c8a1686e017e7bbe159b6f8": {
        "Name": "PauserAdded",
        "fieldsName": ["account"],
        "fieldsType": ["address"],
        "Index": ["account"] // this tell on field we have to apply indexing
    },
    "0xcd265ebaf09df2871cc7bd4133404a235ba12eff2041bb89d9c714a2621c7c7e": {
        "Name": "PauserRemoved",
        "fieldsName": ["account"],
        "fieldsType": ["address"],
        "Index": ["account"]
    },
    "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef": {
        "Name": "Transfer",
        "fieldsName": ["from", "to", "value"],
        "fieldsType": ["address", "address", "uint256"],
        "Index": ["from", "to"]
    },
    "0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925": {
        "Name": "Approval",
        "fieldsName": ["owner", "spender", "value"],
        "fieldsType": ["address", "address", "uint256"],
        "Index": ["owner", "spender"]
    }
}

哈希值将帮助我们从日志中匹配并识别当我们从区块链加载事件时调用哪个事件:

//Code which generates Signature of EVent

https://github.com/touqeerShah/Ethereum-Event-Indexer/blob/main/event/utils/abi_to_signature.js
// final output
https://github.com/touqeerShah/Ethereum-Event-Indexer/blob/main/config/event_signature.json

3.4 从事件生成数据表

一旦 ABI 转换为签名和有意义的数据,就可以为事件生成表,以便我们可以存储数据并对其应用索引。

在创建表之前,请确保你的 PostgresDB 正在运行并且 ENV 已设置。

Env文件:

POSTGRESSDB_ADDRESS="localhost"
POSTGRESSDB_PORT="5432"
POSTGRESSDB_DB="Event"
POSTGRESSDB_PASS="password"
POSTGRESSDB_USER="admin"
PORT=8080
HOST="localhost"
INFURA_APIKEY="Infure Key"
CONTRACT_ADDRESS="your deployed contract  Address"
CHAIN_ID="1"
CHAIN_NAME="ETHEREUM_MAINNET"
STRARTFROM=//Start form where fatch block
TRANSFER_SIGNATURE="0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"

3.5 创建表的时机

下面的代码是我们根据事件参数更新的常量值,使代码动态化, basicTableStructure 包含几乎每个函数都有的一些默认字段,例如交易哈希,块号......

因此,我们创建一个模板,在模板中添加一个新字段并创建表。

// for tables
module.exports.basicTableStructure = (table, addtionalFields) => {
    return `CREATE TABLE IF NOT EXISTS ${table} (
     "transactionHash" VARCHAR(70) NOT NULL,
        "contractAddress" VARCHAR(70) NOT NULL,
        "blockNumber" Numeric NOT NULL,
     ${addtionalFields}
     PRIMARY KEY ("transactionHash")
    );`;
};
// for indexing on table
module.exports.basicIndexStructure = (table, indexField) => {
    return `CREATE INDEX ${table} 
    ON ${indexField};`;
};
// it for storing basic details of event service what was last block which we process
module.exports.eventListed = `
    CREATE TABLE IF NOT EXISTS "eventListed" (
     "chainId" VARCHAR(5) NOT NULL,
     "chainName" VARCHAR(50) NOT NULL,
     "contractAddress" VARCHAR(50) NOT NULL,
     "startBlockNumber" Numeric NOT NULL,
     "totalAmountTransfer" Numeric NOT NULL,
        PRIMARY KEY ("chainId")
    );`;
module.exports.insertEventListed = insertEventListed = (configObj) => {
    return `INSERT INTO public."eventListed" ("chainId", "chainName", "contractAddress", "startBlockNumber","totalAmountTransfer")
    VALUES ('${configObj.CHAIN_ID}', '${configObj.CHAIN_NAME}', '${configObj.CONTRACT_ADDRESS}', ${configObj.STRARTFROM},0)`;
};

module.exports.getStartingBlock = getStartingBlock = (contractAddress) => {
    return `select el."startBlockNumber"  from public."eventListed" el where "contractAddress" like '${contractAddress}'`;
};

module.exports.basicInsertStructure = (table, addtionalFields, values) => {
    return `INSERT INTO ${table} (
     "transactionHash" ,
        "contractAddress",
        "blockNumber",
     ${addtionalFields}
    ) VALUES (${values})`;
};

module.exports.updateStartBlock = (startBlockNumber, contractAddress) => {
    return `UPDATE public."eventListed"
    SET "startBlockNumber"=${startBlockNumber}
    WHERE  "contractAddress" like '${contractAddress}';
    `;
};

module.exports.updateTotalAmountTransfer = (
    newAmountTransfer,
    contractAddress
) => {
    return `UPDATE public."eventListed" el 
    SET "totalAmountTransfer"= "totalAmountTransfer"+${newAmountTransfer} 
    where "contractAddress" like '${contractAddress}';
    `;
};

module.exports.getTotalAmountTransfer = getTotalAmountTransfer = (
    contractAddress
) => {
    return `select el."totalAmountTransfer"  from public."eventListed" el where "contractAddress" like '${contractAddress}'`;
};

module.exports.getSearchTransacationHash = getSearchTransacationHash = (
    transactionHash
) => {
    return `select * from search_columns('${transactionHash}');`;
};

module.exports.setSearchColumeFunction = setSearchColumeFunction = () => {
    return `CREATE OR REPLACE FUNCTION search_columns(
        needle text,
        haystack_tables name[] default '{}',
        haystack_schema name[] default '{}'
    )
    RETURNS table(schemaname text, tablename text, columnname text, rowctid text)
    AS $$
    begin
      FOR schemaname,tablename,columnname IN
          SELECT c.table_schema,c.table_name,c.column_name
          FROM information_schema.columns c
            JOIN information_schema.tables t ON
              (t.table_name=c.table_name AND t.table_schema=c.table_schema)
            JOIN information_schema.table_privileges p ON
              (t.table_name=p.table_name AND t.table_schema=p.table_schema
                  AND p.privilege_type='SELECT')
            JOIN information_schema.schemata s ON
              (s.schema_name=t.table_schema)
          WHERE (c.table_name=ANY(haystack_tables) OR haystack_tables='{}')
            AND (c.table_schema=ANY(haystack_schema) OR haystack_schema='{}')
            AND t.table_type='BASE TABLE'
      LOOP
        FOR rowctid IN
          EXECUTE format('SELECT ctid FROM %I.%I WHERE cast(%I as text)=%L',
           schemaname,
           tablename,
           columnname,
           needle
          )
        LOOP
          -- uncomment next line to get some progress report
          -- RAISE NOTICE 'hit in %.%', schemaname, tablename;
          RETURN NEXT;
        END LOOP;
     END LOOP;
    END;
    $$ language plpgsql;`;
};

为了使用代码并创建表,我们有以下代码,我们导入之前创建的模板和签名,然后开始创建表并在表上建立索引:

var { pool } = require("../module/postgresDB");
var {
    basicTableStructure,
    basicIndexStructure,
    eventListed,
    insertEventListed,
    setSearchColumeFunction,
} = require("../constants/constant");
var event_signature = require("../config/event_signature.json");// load signature
var { configObj } = require("../config/config");
// to execute the query
const execute = async (query) => {
    try {
        await pool.query(query); // sends queries
        return true;
    } catch (error) {
        // console.error(error.stack);
        return false;
    }
};

module.exports.CreateTables = async () => {
    try {
        await pool.connect(); // gets connection
        console.log("Connected to Postgres");
    } catch (error) {
        console.log(error);
        process.exit(1);
    }
    console.log("Creating Event Table ....");
// this table is for default information about event services like contract, 
// last event block number next time start from some point
    await execute(eventListed).then(async (result) => {
        if (result) {
            console.log("Event Table created");
            result = await execute(insertEventListed(configObj));
            if (result) {
                console.log("Basic Info Added Event");
            }
        }
    });
  // get event signature
    for (const eventTabl in event_signature) {
        var tableFields = "";
        var indexField = "";
        // console.log(`${eventTabl}: ${event_signature[eventTabl]}`);
        for (const i in event_signature[eventTabl].fieldsName) {
            tableFields += `"${event_signature[eventTabl].fieldsName[i]}"`;
// based on data type in event assign table data type
            if (event_signature[eventTabl].fieldsType[i] == "uint256") {
                tableFields += " Numeric NOT NULL,";
            } else {
                tableFields += " VARCHAR(50) NOT NULL,";
            }
        }
        for (const i in event_signature[eventTabl].Index) {
            if (i > 0) {
                indexField += ",";
            }
            indexField += `"${event_signature[eventTabl].Index[i]}"`;
        }
        indexField = event_signature[eventTabl].Name + "(" + indexField + ")";
        // pass new field to template and create tables
        await execute(
            basicTableStructure(event_signature[eventTabl].Name, tableFields)
        ).then(async (result) => {
            if (result) {
                console.log(event_signature[eventTabl].Name, " Table created");
                if (event_signature[eventTabl].Index.length != 0) {
                    result = await execute(
                        basicIndexStructure(
                            "index_" + event_signature[eventTabl].Name,
                            indexField
                        )
                    );
                    if (result) {
                        console.log(
                            "Indexing Created On" +
                                event_signature[eventTabl].Name +
                                " Event"
                        );
                    }
                }
            } else {
                console.log(
                    "Table Already Exists",
                    event_signature[eventTabl].Name
                );
            }
        });
    }
    // await pool.end();
    console.log("Creating Search colume Function on Table ....");
    await execute(setSearchColumeFunction).then(async (result) => {
        if (result) {
            console.log("Search colume Function created");
        }
    });
    console.log("Done");
    process.exit(0);
};
// CreateTables();
// console.log("configObj", process.env);
https://github.com/touqeerShah/Ethereum-Event-Indexer/blob/main/constants/constant.js
https://github.com/touqeerShah/Ethereum-Event-Indexer/blob/main/utils/setupDB.js

3.6 从区块链中获得事件

现在是时候从区块链获取事件日志并基存入数据库了。步骤如下 :

  • 首先我们加载模板。
  • 然后加载ABI签名
  • 解码事件中的数据,一些数据是十六进制的,我们需要根据数据类型对它们进行解码以获得有意义的输出
  • 使用 web3 订阅带有过滤选项的日志,其中包含合约和起始区块号
  • 它基于WebSocket,因此当它发现新事件时会加载一个事件并将其存储到DB中并更新DB中的起点
// Setup: npm install alchemy-sdk
const Web3 = require("web3");

var { configObj } = require("../config/config");
var { pool, execute } = require("../module/postgresDB");
var {
    getStartingBlock,
    basicInsertStructure,
    updateStartBlock,
    updateTotalAmountTransfer,
} = require("./../constants/constant");
var event_signature = require("../config/event_signature.json");
var { decodeData } = require("./utils/dcoder");

// connect to Infure
const RPC_ENDPOINT = `wss://mainnet.infura.io/ws/v3/${configObj.INFURA_APIKEY}`;
const web3 = new Web3(RPC_ENDPOINT);
// this connect to web3 subscription with infure RPC to get real time log of chain
module.exports.event = async () => {
    await pool.connect(); // gets connection
    // here get from where to start logs with block number and it will update everytime new block is add
    // so when we stop and start again it will start from where we live
    var response = await execute(
        getStartingBlock(configObj.CONTRACT_ADDRESS),
        pool
    );
    if (!response.status) {
        console.log("Unable to fatch Record ");
        process.exit(1);
    }
    // console.log("startBlockNumber", response.result.rows[0].startBlockNumber);
    var startBlockNumber = response.result.rows[0].startBlockNumber;
    //filter which which address want to monitor and from where
    let options = {
        fromBlock: startBlockNumber,
        address: [configObj.CONTRACT_ADDRESS], //Only get events from specific addresses
        topics: [], //What topics to subscribe to
    };
    var count = 0;
    let subscription = web3.eth.subscribe(
        "logs",
        options,
        async (err, event) => {
            if (!err) {
                console.log(count, event);
                count++;
                // here we get the event log and send it to the store into DB
                await insertEventData(pool, event);
   
                var response = await execute(
                    updateStartBlock(
                        event.blockNumber,
                        configObj.CONTRACT_ADDRESS
                    ),
                    pool
                );
                if (response.status) {
                    console.log("update Starting Block Number ");
                }
                // process.exit(1);
            }
        }
    );
};

将数据存储到 DB 中的逻辑,步骤如下:

  • 首先,我们从事件对象加载基本信息,例如交易哈希和区块号。
  • 当我们讨论主题时,零是事件哈希签名,因此我们可以识别发生的事件并加载我们最早创建的签名。
  • 加载字段名称并创建用于将数据插入数据库的查询。
  • 它还检查事件的数据列中是否有任何数据,如果是,则将该数据也附加到查询中并生成最终查询。
/**
 *  This function will insert data into there respective table
 *  based on it function signature
 * @param {*} pool PostgresDB connection object
 * @param {*} eventObject //logs object
 */
async function insertEventData(pool, eventObject) {
    var tableFields = "";
    // these are common data in all tables
    var tabledata =
        `'${eventObject.transactionHash}'` +
        "," +
        `'${eventObject.address}'` +
        "," +
        `${eventObject.blockNumber}` +
        ",";
    // this is place holder for the constants which tell all the
    // what are the fields name of table which we will going to store
    for (const i in event_signature[eventObject.topics[0]].fieldsName) {
        if (i != 0) tableFields += ",";
        tableFields += `"${
            event_signature[eventObject.topics[0]].fieldsName[i]
        }"`;
    }
    // get values and decode them with based on there type
    for (let index = 1; index < eventObject.topics.length; index++) {
        var data = decodeData(
            event_signature[eventObject.topics[0]].fieldsType[index - 1],
            eventObject.topics[index]
        );
        if (index != 1) {
            tabledata += ",";
        }
        tabledata += `'${data}'`;
    }
    // if event have data values
    if (eventObject.data != "") {
        var len = event_signature[eventObject.topics[0]].fieldsType.length;
        var data = decodeData(
            event_signature[eventObject.topics[0]].fieldsType[len - 1],
            eventObject.data
        );
        if (
            event_signature[eventObject.topics[0]].fieldsType[len - 1] ==
            "uint256"
        ) {
            tabledata += `,${data}`;
            // if values if number and signature is for transfer the sum it with Total amount transfer in DB
            if (eventObject.topics[0] == configObj.TRANSFER_SIGNATURE) {
                const etherValue = Web3.utils.fromWei(data.toString(), "ether");
                console.log("etherValue", etherValue);
                var response = await execute(
                    updateTotalAmountTransfer(
                        etherValue,
                        configObj.CONTRACT_ADDRESS
                    ),
                    pool
                );
                if (response.status) {
                    console.log("update Transfer Amount");
                }
            }
        } else {
            tabledata += `,'${data}'`;
        }
    }
    // get name of table based on it signature
    var tableName = `public."${event_signature[
        eventObject.topics[0]
    ].Name.toLowerCase()}"`;
    // insert data into table
    var response = await execute(
        basicInsertStructure(tableName, tableFields, tabledata),
        pool
    );
    if (!response.status) {
        console.log(
            "Unable to Insert Record ",
            event_signature[eventObject.topics[0]],
            "Transaction hash ",
            eventObject.transactionHash
        );
        // process.exit(1);
    }
}
https://github.com/touqeerShah/Ethereum-Event-Indexer/blob/main/event/event_listener.js

3.9 Rest API

我们还有一些Rest端点,如果你想在活动事件结束后重放数据库数据,可以扩展它并根据你的需要添加新的 API。

const express = require("express");
const {
    getTotalAmount,
    verifyHashDB,
    verifyHash,
} = require("../controller/blockchainController");
const router = express.Router();
// following are the routes which we used to expose the  backend service
router.get("/getTotalAmount", getTotalAmount);
router.get("/verifyHashDB", verifyHashDB);
router.get("/verifyHash", verifyHash);

module.exports = router;
{
 "info": {
  "_postman_id": "5b6e2a48-d7d3-43a0-9bb5-4738afe5d66e",
  "name": "Task",
  "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json",
  "_exporter_id": "11496047"
 },
 "item": [
  {
   "name": "getTotalAmount",
   "request": {
    "method": "GET",
    "header": [],
    "url": {
     "raw": "http://localhost:8080/api/getTotalAmount",
     "protocol": "http",
     "host": [
      "localhost"
     ],
     "port": "8080",
     "path": [
      "api",
      "getTotalAmount"
     ]
    }
   },
   "response": []
  },
  {
   "name": "verifyHash",
   "request": {
    "method": "GET",
    "header": [],
    "url": {
     "raw": "http://localhost:8080/api/verifyHash?transactionHash=0x320b95cef4c3cfe7ffca91f8bd9e5734cdba99cafc430004a50b9f553194929a",
     "protocol": "http",
     "host": [
      "localhost"
     ],
     "port": "8080",
     "path": [
      "api",
      "verifyHash"
     ],
     "query": [
      {
       "key": "transactionHash",
       "value": "0x320b95cef4c3cfe7ffca91f8bd9e5734cdba99cafc430004a50b9f553194929a"
      }
     ]
    }
   },
   "response": []
  }
 ]
}

原文链接:Ethereum Event Indexer

DefiPlot翻译整理,转载请标明出处

通过 NowPayments 打赏