Quickstart

npm install univo @storagesdk/core @storagesdk/adapters

Let's use the univo npm package to load ERC20 Transfer events into our database.

import { indexer } from "univo";
import { Storage } from "@storagesdk/core";
import { fs } from "@storagesdk/adapters/fs";
import { parseAbiItem, toEventSelector } from "viem";
import type { RpcBlock, RpcTransactionReceipt } from 'viem';

// Create an indexer by providing a signing key to facilitate secure communication, a `getBlock` function
// to load block data in realtime from an RPC node, and a storage adapter for our indexers metadata.

const univo = indexer({ 
    getBlock, 
    metadataStorage,
    signingKey: process.env.UNIVO_SIGNING_KEY
});

// univo stores metadata in any S3 compatible object storage. Normally you would use an adapter like `s3`
// for AWS or `r2` for Cloudflare. For this quickstart we will just use the filesystem for simplicity.

const metadataStorage = new Storage({
    adapter: fs({
        root: './.storage',
        folder: 'univo-metadata',
    })
})

async function getBlock(head: { chain: `0x${string}`; number: string; }) {
	const [eth_getBlockByNumber, eth_getBlockReceipts] = await Promise.all([
		rpc({ jsonrpc: "2.0", id: 1, method: "eth_getBlockByNumber", params: [head.number, true] }),
		rpc({ jsonrpc: "2.0", id: 2, method: "eth_getBlockReceipts", params: [head.number] })
	]);

	if (!eth_getBlockByNumber) throw new Error();
	if (!eth_getBlockReceipts) throw new Error();

	return { 
        eth_chainId: head.chain, 
        eth_getBlockByNumber: eth_getBlockByNumber as RpcBlock<"latest", true>, 
        eth_getBlockReceipts: eth_getBlockReceipts as RpcTransactionReceipt[]
    };
}

async function rpc(opts: { jsonrpc: "2.0"; id: number; method: string; params: any[] }) {
    // RPC_URL is any HTTP RPC provider like http://eth-mainnet.g.alchemy.com/v2/YOUR_API_KEY
	const res = await fetch(process.env.RPC_URL, {
		method: "POST",
		body: JSON.stringify(opts),
		headers: { "Content-Type": "application/json" },
	});

	const json: any = await res.json();

	return json.result;
}

const abi = parseAbiItem("event Transfer(address indexed from, address indexed to, uint256 value)");

// Define an event that will upsert every ERC20 Transfer for each Ethereum block
univo.event({
    id: "transfers-erc20",

    // Use a filter define the specific blocks we want to index, in this case we only want blocks
    // on Ethereum mainnet where a transaction emitted a `Transfer` event log
    filters: [
        {
            chain: 1, 
            fromBlock: 0, 
            event: toEventSelector(abi) // "0xddf252ad1be2c..."
        }
    ],

    // Define a handler that maps the raw block data into a list of structured transfer events
    handler(block) {
        return block.eth_getBlockReceipts
			.flatMap((receipt) => receipt.logs)
			.filter((log) => log.topics[0] === toEventSelector(abi))
			.map((log) => {
                try {
                    const { args } = decodeEventLog({ ...log, strict: true, abi: [abi] });

                    return {
                        id: log.blockTimestamp + log.blockNumber + log.transactionIndex + log.logIndex,
                        to_address: args.to,
                        quantity: args.value,
                        from_address: args.from,
                        token_address: log.address,
                    };
                } catch {
                    return null;
                }
			})
            .filter(Boolean);
    },

    // Define a storage adapter that upserts the transfer events into our storage system like Postgres.
    storage: { 
        async upsert(transfers) {
            console.log(transfers);
            // [{ id "...", to_address: "...", quantity: ..., from_address: "...", token_address: "..." }]
        }
    }
})

// Securely deploy the standard (req: Request) => Promise<Response> `fetch` handler using any
// of your favourite frameworks like Next.js, Cloudflare Workers, or Bun
Bun.serve({ fetch: univo.fetch, port: 3000 });

To begin delivering blocks to our indexer, we will then deploy the univo/realtime client in a container using services like AWS, Google Cloud, or Railway and point it at our deployed HTTP handler. The realtime client is responsible for establishing and maintaining a durable WebSocket connection with an RPC node and notifying our HTTP handler when new blocks are produced as well as handling retries and chain reorganisations.

import { realtime } from "univo/realtime";
import { wss, http } from "univo/transport";

// NODE_URL is any WebSocket RPC provider e.g. wss://eth-mainnet.g.alchemy.com/v2/YOUR_API_KEY
const node = wss(process.env.NODE_URL);

// INDEXER_URL is your deployed HTTP handler e.g. https://yourapplication.com/api/indexer
const indexer = http(process.env.INDEXER_URL);

realtime({ node, indexer });

We are now successfully loading realtime blockchain events into our own storage system. To build on this experience we have also developed the univo dashboard. You can add your indexer endpoints to view all your events and backfill historical blocks at blazing-fast speeds. The dashboard is free to use, the only paid feature comes from backfilling historical blocks. We perform usage based billing where you are charged per successfully delivered block. View our backfill pricing for more details.

To develop locally you can use our CLI to open the dashboard using your local indexer endpoint.

$ npx univo dev http://localhost:3000

Architecture

Indexer

The indexer is responsible for loading realtime block data via the getBlock function, transforming that raw block data using your defined handler function, and upserting the returned events into your database using the provided storage adapter.

The indexer is designed to be a simple stateless service. This means it can be deployed to both serverless functions and long-running containers depending on what works best for your team.

Realtime client(s)

The realtime client is responsible for tracking new blocks produced for each EVM blockchain you want to index. Functionally, it does this by maintaining and establishing a WebSocket connection with an RPC node. As the chain produces new blocks, the realtime client will notify your indexer by sending a block "head" which includes the chain id, the new block number, and its corresponding block hash.

It ensures that your indexer correctly processes blocks by maintaining an internal representation of the chain. When a reorged block is encountered, the realtime client reconciles its conflicting local chain with the canonical remote chain before notifying your indexer to process any new blocks.

The realtime client is a stateful service managing WebSocket connections therefore it must be deployed in a long-running container. You are expected to deploy a realtime client for each of the chains your application wants to index. Our architecture is designed this way to give you full autonomy over your realtime blockchain indexing. This minimises vendor lock-in and ensures that the availability of your system is not dependent on any third party services.

Filters

Filters let you define the specific blocks you want to index. Blockchains are massive datasets and most of the time we are only ever interested in small portions of it. Sometimes that can be specific events like ERC20 transfers and other times it could be all events performed by a specific contract. Filters provide a simple way for an event to define exactly which blocks we want to index.

Filters reduce costs and improve backfill performance by ensuring we only index the blocks we need and ignore the blocks that don't have the data we are interested in.

By default your event will not index any blocks, you must opt-in to indexing by providing atleast one filter. Filters are composed of five properties:

  • chain number required index blocks with this chain id
  • fromBlock number required index blocks from this start block (inclusive)
  • toBlock number index blocks until this stop block (inclusive)
  • address string index blocks that involve this address
  • event string index blocks where this event topic was emitted

Each property provided in the filter operates like an AND statement. For example, if you specify an address and an event it implies that you only want to index blocks where the specific address emitted the specific event topic provided.

However, when multiple filters are defined for a given event those filters operate like an OR statement. For example, if we define a second filter looking for a different address and event our event will now index any block that matches either the first filter or the second filter.

Filters are a rudimentary method to dramatically reduce the number of blocks your application needs to index. Any advanced filtering should be performed in the event handler itself.

Handlers

When a block matches any of filters defined by your event it will be passed to the handler function to be synchronously transformed into a list of structured events.

The shape of the input block data is determined by the return value of the getBlock function. Generally, the input block is an object where each [key, value] pair corresponds to a raw RPC [method, response] returned by your blockchain node. This generic format allows you to support the full range of RPC data available to you from your node. Indexing new RPC data is as simple as modifying the getBlock function to call that method and return that data. This flexibility is important in cases where some methods are supported on specific chains only.

Note that there is a minimum set of RPC methods expected on the response from your getBlock function, notably eth_chainId, eth_getBlockByNumber and eth_getBlockReceipts. These are expected so that we can safely match each block processed against your event filters.

The returned output value should be an array containing any valid JavaScript values. Each event that you return from your handler should not depend on any information outside of the input block data. It should directly map a given input (the raw block) to a given output (structured events). This ensures that your handler remains idempotent and that repeated calls with the same input block produce the same output events.

Storage

After a block is transformed into a list of structured events they are passed to your storage adapter in batches. A storage adapter defines two asynchronous methods upsert and delete:

  • upsert is responsible for upserting a given batch of events into your storage system. This function is invoked every time a new block is produced during realtime indexing or when a historical block is received during backfills. The primary rule of this function is that it must be idempotent. Functionally, this means that if the same batch of events is upserted multiple times it only produces a single set of events in your storage system.

  • delete is reponsible for deleting a given batch of events from your storage system. This function is only invoked during realtime indexing when a chain reorganisation occurs and ensures that any events previously written to storage from a block that was reorganised (no longer included in the canonical chain) are removed.

The storage adapter interface is designed to be agnostic so that you can easily support different types of off-chain storage. This allows you to use any off-chain storage that makes sense for each of your events. It is trivial to use a RDBMS like Postgres or MySQL for one event, an analytics store like ClickHouse for another, and a regular Key-Value store like Object Storage for another. By implementing the two methods above alone, we are able to correctly replicate on-chain data into your off-chain storage.

Backfills

Once you have your indexer setup and processing blocks in realtime, you'll likely want to backfill historical blocks that match the defined filters for your events. Simply navigate to our dashboard, add your HTTP indexer endpoint, and start a backfill. Backfills allow you to load events into your database from historical blockchain data and are optimised for both speed and cost.

In realtime, your indexer loads blocks from the getBlock function you provide. While it would be possible to use this function for backfills, it quickly becomes slow and expensive because blockchain RPC nodes are not designed for this kind of data access. To reduce cost and maximise performance we have developed a custom storage engine that better aligns with how your indexer processes historical data. Currently, the chains supported are:

Ethereum logo

Ethereum (Mainnet)

Note that realtime indexing supports all EVM chains and only requires a single RPC node connection. The limited support only applies to backfilling historical data and we will be adding more EVM chains soon.

Backfills use the filters provided by your events to only send you the blocks that have the on-chain events you are looking for. This takes into account the fromBlock, toBlock, event and address properties defined on your filters. Backfills can be stopped manually at any time through the dashboard. Backfills will also be stopped automatically if it encounters any persistent errors with your endpoint and it fails to process a block.

Pricing

Your first 10,000,000 blocks backfilled are free. After that, backfills are charged at $10 USD per million blocks.

Backfills only charge for blocks successfully delivered to your endpoint. This means any retries caused by transient failures in your endpoint to process a block are free, and you will only be charged once that block is successfully processed and your events are upserted to storage.