Quickstart

Let's use the univo npm package to load ERC20 Transfer events into our database.

First we define an indexer by providing a signing key to support secure communication, and an async function getBlock to load realtime block data from an RPC node. Next we are define an event that returns the list of ERC20 transfer events for each Ethereum block. We use a filter to define the specific blocks we want to index, in this case only the Ethereum blocks where a transaction emitted a Transfer event log. We define a handler that maps the raw block data into a list of structured transfer events. Finally, we upsert the returned events into our existing storage system like Postgres or MySQL. We then securely deploy the standard HTTP handler provided by the indexer using any of your favourite server frameworks like Next.js, Bun, or Cloudflare Workers.

import { indexer } from "univo";
import { parseAbiItem, toEventSelector } from "viem";
import type { RpcBlock, RpcTransactionReceipt } from 'viem';

// Create an indexer
const univo = indexer({ getBlock, signingKey: process.env.UNIVO_SIGNING_KEY });

async function getBlock(head: { chain: `0x${string}`; number: `0x${string}`; hash: `0x${string}` }) {
	const [eth_getBlockByHash, eth_getBlockReceipts] = await Promise.all([
		rpc({ id: 1, method: "eth_getBlockByHash", params: [head.hash, true] }),
		rpc({ id: 2, method: "eth_getBlockReceipts", params: [head.hash] })
	]);

	if (!eth_getBlockByHash) throw new Error();
	if (!eth_getBlockReceipts) throw new Error();

	return { 
        eth_chainId: head.chain, 
        eth_getBlockByHash: eth_getBlockByHash as RpcBlock<"latest", true>, 
        eth_getBlockReceipts: eth_getBlockReceipts as RpcTransactionReceipt[]
    };
}

async function rpc(opts: { id: number; method: string; params: any[] }) {
    // RPC_URL is any HTTP RPC provider like http://eth-mainnet.g.alchemy.com/v2/YOUR_API_KEY
	const res = await fetch(process.env.RPC_URL, {
		method: "POST",
		headers: { "Content-Type": "application/json" },
		body: JSON.stringify({ jsonrpc: "2.0", ...opts }),
	});

	const json: any = await res.json();

	return json.result;
}

const abi = parseAbiItem("event Transfer(address indexed from, address indexed to, uint256 value)");

// Define an event
univo.event({
    id: "transfers-erc20",

    filters: [
        {
            chain: 1, 
            fromBlock: 0, 
            event: toEventSelector(abi) // "0xddf252ad1be2c..."
        }
    ],

    handler(block) {
        return block.eth_getBlockReceipts
			.flatMap((receipt) => receipt.logs)
			.filter((log) => log.topics[0] === toEventSelector(abi))
			.map((log) => {
                try {
                    const { args } = decodeEventLog({ ...log, strict: true, abi: [abi] });

                    return {
                        id: log.blockTimestamp + log.blockNumber + log.transactionIndex + log.logIndex,
                        to_address: args.to,
                        quantity: args.value,
                        from_address: args.from,
                        token_address: log.address,
                    };
                } catch {
                    return null;
                }
			})
            .filter(Boolean);
    },

    storage: { 
        async upsert(transfers) {
            console.log(transfers);
            // [{ id "...", to_address: "...", quantity: ..., from_address: "...", token_address: "..." }]
        }
    }
})

// Deploy the fetch handler to https://yourapplication.com/api/indexer
Bun.serve({ fetch: univo.fetch, port: 3000 });

To begin delivering blocks to our indexer, we will then deploy the univo/realtime client in a container using services like AWS, Google Cloud, or Railway and point it at our deployed HTTP handler. This realtime client is responsible for establishing and maintaining a durable WebSocket connection with an RPC node and notifying our HTTP handler when new blocks are produced as well as handling retries and chain reorganisations.

import { realtime, defineTransport } from "univo/realtime";

// WSS_RPC_URL is any WebSocket RPC provider e.g. wss://eth-mainnet.g.alchemy.com/v2/YOUR_API_KEY
const transport = defineTransport(process.env.WSS_RPC_URL);

// ENDPOINT_URL is your deployed HTTP handler e.g. https://yourapplication.com/api/indexer
realtime({ transport, endpoints: [process.env.ENDPOINT_URL] });

We are now successfully loading realtime blockchain events into our own storage system. To build on this experience we have also developed the univo dashboard. You can add your indexer endpoints to view all your events, view any errors encountered when indexing, and backfill historical blocks for your events. The dashboard is free to use, the only paid feature comes from backfilling historical blocks. We perform usage based billing where you are charged per successfully delivered block. View our backfill pricing for more details.

To develop locally you can use our CLI to open the dashboard with your indexer.

$ npx univo dev http://localhost:3000

Architecture

Your infrastructure

To index data in realtime requires the deployment of at least two services: a realtime client(s) and an indexer.

Realtime client(s)

The realtime client is responsible for tracking new blocks produced for each EVM blockchain you want to index. Functionally, it does this by maintaining and establishing a WebSocket connection with an RPC node. As the chain produces new blocks, the realtime client will notify your indexer by sending a block "head" which includes the chain id, the new block number, and its corresponding block hash.

It ensures that your indexer correctly processes blocks by maintaining an internal representation of the chain. When a reorged block is encountered, the realtime client reconciles its conflicting local chain with the canonical remote chain before notifying your indexer to process any new blocks.

The realtime client is a stateful service managing WebSocket connections therefore it must be deployed in a long-running container. You are expected to deploy a realtime client for each of the chains your application wants to index. Our architecture is designed this way to give you full autonomy over your realtime blockchain indexing. This minimises vendor lock-in and ensures that the availability of your system is not dependent on any third party services.

Indexer

The indexer is responsible for loading realtime block data via the getBlock function, transforming that raw block data using your defined handler function, and upserting the returned events into your database using the provided storage adapter.

The indexer is designed to be a simple stateless service. This means it can be deployed to both serverless functions and long-running containers depending on what works best for your team.

univo infrastructure

We also run our own infrastructure to build upon the experience of indexing blockchains. Our infrastructure is responsible for features like viewing your events in the dashboard, managing and retrying errors encountered when indexing your events, and backfilling historical data at blazing-fast speeds.

Currently our infrastructure is deployed in us-east-1. We recommend aligning your indexers with this region for maximum performance when backfilling historical data.

Filters

Filters let you define the specific blocks you want to index. Blockchains are massive datasets and most of the time we are only ever interested in small portions of it. Sometimes that can be specific events like ERC20 transfers and other times it could be all events performed by a specific contract. Filters provide a simple way for an event to define exactly which blocks we want to index.

Filters reduce costs and improve backfill performance by ensuring we only index the blocks we need and ignore the blocks that don't have the data we are interested in.

By default your event will not index any blocks, you must opt-in to indexing by providing atleast one filter. Filters are composed of five properties:

  • chain number required index blocks with this chain id
  • fromBlock number required index blocks from this start block (inclusive)
  • toBlock number index blocks until this stop block (inclusive)
  • address string index blocks that involve this address
  • event string index blocks where this event topic was emitted

Each property provided in the filter operates like an AND statement. For example, if you specify an address and an event it implies that you only want to index blocks where the specific address emitted the specific event topic provided.

However, when multiple filters are defined for a given event those filters operate like an OR statement. For example, if we define a second filter looking for a different address and event our event will now index any block that matches either the first filter or the second filter.

Filters are a rudimentary method to dramatically reduce the number of blocks your application needs to index. Any advanced filtering should be performed in the event handler itself.

Handlers

When a block matches any of filters defined by your event it will be passed to the handler function to be synchronously transformed into a list of structured events.

The shape of the input block data is determined by the return value of the getBlock function. Generally, the input block is an object where each [key, value] pair corresponds to a raw RPC [method, response] returned by your blockchain node. This generic format allows you to support the full range of RPC data available to you from your node. Indexing new RPC data is as simple as modifying the getBlock function to call that method and return that data. This flexibility is important in cases where some methods are supported on specific chains only.

Note that there is a minimum set of RPC methods expected on the response from your getBlock function, notably eth_chainId, eth_getBlockByHash and eth_getBlockReceipts. These are expected so that we can safely match each block processed against your event filters.

The returned output value should be an array containing any valid JavaScript values. Each event that you return from your handler should not depend on any information outside of the input block data. It should directly map a given input (the raw block) to a given output (structured events). This ensures that your handler remains idempotent and that repeated calls with the same input block produce the same output events.

Storage

After a block is transformed into a list of structured events they are passed to your storage adapter in batches. A storage adapter defines a single asynchronous upsert function that is responsible for upserting that batch of events into your storage system.

This function must be idempotent. Functionally, this means that if the same batch of events is upserted multiple times it only produces a single set of events in your storage system.

Errors

This feature is available through our dashboard.

The dashboard displays any errors encountered when indexing your events.

  • handler_error the indexer failed to run your handler for a given block. These will not be retried, as these errors usually indicate a problem with your transformation logic.
  • upsert_error the indexer failed to run your upsert your events into storage for a given block. These will be retried using an exponential-backoff algorithm.
  • internal_error we encountered an internal error within the univo infrastructure. These are rare and indicate an issue with the univo service and not with your indexer.
  • realtime_error the indexer never received an expected block. These are recorded when a block that matches one of your defined filters is finalised on-chain and your indexer never received it. This indicates that your indexer "missed" this block and your storage system will be missing expected events. Note that these errors are sometimes recorded as a false-positive, i.e. we recorded an error even when a block was successfully received. This is a consequence of the constraints imposed when handling errors in a distributed system.

All errors can be retried through the dashboard. You can also manually retry specific blocks for an event which is useful when developing locally and quickly wanting to test a specific block is processed correctly.

Backfills

This feature is available through our dashboard.

Backfills allow you to load events into your database from historical blockchain data. They are optimised for both correctness and for performance.

In realtime, your indexer loads blocks from the getBlock function you provide. However, for historical blocks getBlock would be too slow and expensive. Blockchain RPC nodes are not developed for this kind of data access. To reduce cost and maximise performance we have developed a custom storage engine to maximise throughput.