1.0.6 • Published 4 months ago

@parsiq/data-lake-sdk v1.0.6

Weekly downloads
-
License
proprietary
Repository
-
Last release
4 months ago

PARSIQ Data Lake SDK Quick Introduction & Reference

Introduction

Data Lakes serve as a custom, application-specific data layer for filtering, aggregation, and analysis of blockchain-scale data contained in the Tsunami data store. This SDK provides the general infrastructure for querying Tsunami, maintaining historical data, and transparently handling blockchain reorganizations as well as various configuration options, allowing you to adjust the runtime behavior to the specific needs of your application.

The Data Lake SDK supports two main types of Data Lakes sequential and parallel.

Sequential Data Lakes are well-suited for low-latency computations with evolving state, especially when data volumes are not too large.

Parallel Data Lakes are tailored towards processing massive volumes of historical data by allowing multiple workers to process blocks independently and out-of-order. The trade-off is that they lack the capability to maintain state.

In some cases, a combination of parallel and sequential Data Lakes is the perfect solution to enable processing of both historical and real-time data, but additional processing or conversion may be needed when transitioning from historical to real-time mode.

Basics of sequential Data Lake implementation

Let's go over the steps involved in implementing a sequential Data Lake, using a simple Data Lake gathering some basic information on CNS registry contract as a running example. We will be monitoring associations of tokens with URIs and current token ownership only, without delving into name resolution.

The complete code for the Data Lake itself (let's call it src/datalake.ts) is listed below. We will analyze it in detail in the following sections. To make it run, follow these steps:

  1. Initialize a new package.
  2. Install the dependencies (@parsiq/datalake-sdk, @parsiq/tsunami-client, @parsiq/tsunami-client-sdk-http, @ethersproject/abi).
  3. Put the implementation in src/datalake.ts.
  4. Replace the TSUNAMI_API_KEY constant in src/datalake.ts with your actual Tsunami API key.
  5. Implement the entry point (e.g., main.ts), which should simply call run().
  6. Put together simple Dockerfile and docker-compose.yml.
  7. Build and run.

We will provide simple examples of the files involved in the following sections but for now let's just focus on the Data Lake.

import * as sdk from '@parsiq/datalake-sdk';
import { TsunamiApiClient, ChainId } from '@parsiq/tsunami-client';
import { TsunamiFilter, TsunamiEvent, DatalakeTsunamiApiClient } from '@parsiq/tsunami-client-sdk-http';
import { Interface } from '@ethersproject/abi';

// Import your ABIs here, in a format suitable for decoding using @ethersproject/abi.
import cnsRegistryAbi from './cns-registry.json';

// Put your Tsunami API key here.
const TSUNAMI_API_KEY = '';
// This is the chain ID for Ethereum mainnet Tsunami API. Change it if you want to work with a different net.
const TSUNAMI_API_NET = ChainId.ETH_MAINNET;

// CNS Registry contract address, replace or drop if you intend to monitor something else.
const CONTRACT_ADDRESS = '0xD1E5b0FF1287aA9f9A268759062E4Ab08b9Dacbe';

// topic_0 hashes of our events of interest.
const EVENT_NEW_URI_TOPIC_0 = '0xc5beef08f693b11c316c0c8394a377a0033c9cf701b8cd8afd79cecef60c3952';
const EVENT_TRANSFER_TOPIC_0 = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef';

// Contract deployment block is used as a starting point for the Data Lake.
const CONTRACT_DEPLOYMENT_BLOCK_NUMBER = 9_082_251;

// This defines the layout of the type-safe K-V storage.
type DatalakeStorageLayout = {
    '': any,
    domains: { uri: string },
    ownership: { owner: string },
};

type DatalakeStorageMetaLayout = {
    '': {},
    domains: {},
    ownership: {},
};

// Types for decoded events follow.
type NewUriEvent = {
    tokenId: {
        _hex: string,
    },
    uri: string,
};

type TransferEvent = {
    from: string,
    to: string,
    tokenId: {
        _hex: string,
    },
};

class Datalake extends sdk.AbstractMultiStorageDataLakeBase<DatalakeStorageLayout, DatalakeStorageMetaLayout, TsunamiFilter, TsunamiEvent> {
    private cnsRegistryDecoder: Interface;

    // Construct ABI decoders here.
    constructor() {
        super();
        this.cnsRegistryDecoder = new Interface(cnsRegistryAbi);
    }

    public override getProperties(): sdk.DataLakeProperties {
        return {
            id: 'DATALAKE-TEMPLATE',
            initialBlockNumber: CONTRACT_DEPLOYMENT_BLOCK_NUMBER,
        };
    }

    // This method generates the filter used to retrieve events from Tsunami. Filter may change from block to block.
    public async genTsunamiFilterForBlock(block: sdk.Block & sdk.DataLakeRunnerState, isNewBlock: boolean): Promise<TsunamiFilter> {
        return {
            contract: [CONTRACT_ADDRESS],
            topic_0: [EVENT_NEW_URI_TOPIC_0, EVENT_TRANSFER_TOPIC_0],
        };
    }

    // Main event handler.
    public async processTsunamiEvent(event: TsunamiEvent & sdk.TimecodedEvent & sdk.DataLakeRunnerState): Promise<void | TsunamiFilter> {
        switch (event.topic_0) {
            case EVENT_NEW_URI_TOPIC_0:
                await this.processNewUriEvent(event);
                break;
            case EVENT_TRANSFER_TOPIC_0:
                await this.processTransferEvent(event);
                break;
        }
    }

    private async processNewUriEvent(event: TsunamiEvent): Promise<void> {
        // Decodes the event...
        const fragment = this.cnsRegistryDecoder.getEvent(event.topic_0!);
        const decoded = this.cnsRegistryDecoder.decodeEventLog(fragment, event.log_data!, [
            event.topic_0!,
            event.topic_1!
        ]) as unknown as NewUriEvent;
        // ...then writes to reogranization-aware K-V storage.
        await this.set('domains', decoded.tokenId._hex, { uri: decoded.uri });
    }

    private async processTransferEvent(event: TsunamiEvent): Promise<void> {
        if (event.op_code !== 'LOG4') {
            return;
        }
        const fragment = this.cnsRegistryDecoder.getEvent(event.topic_0!);
        const decoded = this.cnsRegistryDecoder.decodeEventLog(fragment, event.log_data!, [
            event.topic_0!,
            event.topic_1!,
            event.topic_2!,
            event.topic_3!
        ]) as unknown as TransferEvent;
        await this.set('ownership', decoded.tokenId._hex, { owner: decoded.to });
    }

    // The following event handlers should be no-ops under most circumstances.
    public async processEndOfBlockEvent(event: sdk.Block & sdk.DataLakeRunnerState): Promise<void> {}
    public async processBeforeDropBlockEvent(event: sdk.DropBlockData & sdk.DataLakeRunnerState): Promise<void> {}
    public async processAfterDropBlockEvent(event: sdk.DropBlockData & sdk.DataLakeRunnerState): Promise<void> {}
}

export const run = async (): Promise<void> => {
    const logger = new sdk.ConsoleLogger();
    logger.log('DEBUG', 'Initializing datalake...');
    const datalake = new Datalake();
    logger.log('DEBUG', 'Initializing Tsunami API...');
    const tsunami = new TsunamiApiClient(TSUNAMI_API_KEY, TSUNAMI_API_NET);
    logger.log('DEBUG', 'Initializing SDK Tsunami client...');
    const tsunamiSdk = new DatalakeTsunamiApiClient(tsunami);
    logger.log('DEBUG', 'Initializing runner...');
    const runner = new sdk.MultiStorageDataLakeRunner({
        storageConfig: {
            '': { meta: {} },
            'domains': { meta: {} },
            'ownership': { meta: {} },
        },
        datalake: datalake,
        tsunami: tsunamiSdk,
        log: logger,
    });
    logger.log('DEBUG', 'Running...');
    await runner.run();
}

Key questions

Before beginning to implement your Data Lake, we recommend answering the following key questions regarding it.

What events are you interested in?

First of all, you need to determine what kind of information available in the Tsunami data store you want to process. The specific kinds of available events and filtering options depend in a large part on the Tsunami client you are using. The SDK itself has few assumptions about the Tsunami client, listed in detail in the reference section below. Thus as long as the Data Lake and the Tsunami client agree on the filter and event formats, everything should be in order.

In our sample case, we want to monitor the CNS registry contract on Ethereum mainnet:

const TSUNAMI_API_NET = ChainId.ETH_MAINNET;
const CONTRACT_ADDRESS = '0xD1E5b0FF1287aA9f9A268759062E4Ab08b9Dacbe';

We are only interested in two log event types, NewURI, which indicates the minting of a new token, and Transfer, which indicates the transfer of token ownership:

const EVENT_NEW_URI_TOPIC_0 = '0xc5beef08f693b11c316c0c8394a377a0033c9cf701b8cd8afd79cecef60c3952';
const EVENT_TRANSFER_TOPIC_0 = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef';

Topic 0 hashes have been computed here from the event signatures in the CNS registry ABI.

The CNS registry contract was deployed at block 9,082,251 so we will not need to look at any preceding blocks:

const CONTRACT_DEPLOYMENT_BLOCK_NUMBER = 9_082_251;

Since Tsunami client serves raw events, we will want to decode those using ABI, e.g., @ethersproject/abi:

import { Interface } from '@ethersproject/abi';
import cnsRegistryAbi from './cns-registry.json';

The sample ABI file can be found in the section below.

We also have to define the types for decoded log events:

type NewUriEvent = {
    tokenId: {
        _hex: string,
    },
    uri: string,
};

type TransferEvent = {
    from: string,
    to: string,
    tokenId: {
        _hex: string,
    },
};

So far, we have been defining only types and constants. We will start to put all of this together once we have answered all the key questions about our Data Lake.

How do you want to process the data?

You can perform more or less arbitrary computations on the events you are processing with the limiting factor being sufficient performance to be able to catch up to the block rate of the chain in question.

In the case of our example Data Lake, we won't be doing anything much with the data, we just want to store the information on the new tokens and transfers of ownership.

Having determined that, we don't need to write any code just yet so we can move on to the next question.

How do you want to store the results?

The Data Lakes can produce arbitrary side effects while processing events, but the main result of their work is supposed to be a data store with processed data that would be easy to query for consumer apps.

The SDK supports either plain old PostgreSQL as a storage backend or PostgreSQL-based Citus, which allows for a substantial degree of horizontal scalability. The storage is organized into any number of independent, type-safe key-value storages that maintain both the current state and the complete history of changes. The SDK manages the state rollbacks automatically in case of blockchain reorganizations. While the default storage model is document-based, the SDK supports inclusion of additional relational fields and indices in the storage tables. This is mostly needed for advanced use cases so we will not be using this capability in our example Data Lake.

As we have already mentioned, we want to keep track of mappings of tokens to URIs, and of current ownership information for each token. Hence, we are going to need two fairly simple storages:

type DatalakeStorageLayout = {
    '': any,
    domains: { uri: string }, // Keyed by token ID.
    ownership: { owner: string }, // Keyed by token ID.
};

Here the domains storage maps token IDs (keys) to URIs while the ownership storage maps token IDs to owner addresses. The SDK makes sure that the current state of these two storages is represented by the domains_entities and mutations_entities tables on the storage backend while historical information is kept in the domains_mutations and ownership_mutations tables.

The SDK also requires us to always define a default storage with an empty string as a name but we are simply leaving it unused in our example.

While we won't be using custom fields in our Data Lake, we still have to to define a type declaring just that:

type DatalakeStorageMetaLayout = {
    '': {},
    domains: {},
    ownership: {},
};

Implementing the Data Lake itself

Now we are ready to implement the Data Lake as follows:

class Datalake extends sdk.AbstractMultiStorageDataLakeBase<DatalakeStorageLayout, DatalakeStorageMetaLayout, TsunamiFilter, TsunamiEvent> {

We are extending a convenient base class exposed by the SDK here. The filter and event types for the Tsunami client are imported from our client package. The types defining the storage layout are the ones we declared above while anwsering the key questions about the Data Lake.

We need to initialize the event decoder in the constructor:

    private cnsRegistryDecoder: Interface;

    constructor() {
        super();
        this.cnsRegistryDecoder = new Interface(cnsRegistryAbi);
    }

We also have to define the method providing the fundamental properties of our Data Lake - its name and starting block:

public override getProperties(): sdk.DataLakeProperties {
    return {
        id: 'EXAMPLE-DATALAKE',
        initialBlockNumber: CONTRACT_DEPLOYMENT_BLOCK_NUMBER,
    };
}

The next method specifies the filter that we want to use to retrieve events from Tsunami:

public async genTsunamiFilterForBlock(block: sdk.Block & sdk.DataLakeRunnerState, isNewBlock: boolean): Promise<TsunamiFilter> {
    return {
        contract: [CONTRACT_ADDRESS],
        topic_0: [EVENT_NEW_URI_TOPIC_0, EVENT_TRANSFER_TOPIC_0],
    };
}

This method may generate the filter dynamically as it can both inspect and update the state of the Data Lake, doubling as the event handler for new blocks.

Note that in case your implementation of genTsunamiFilterForBlock() updates the state, you MUST NOT alter the state if the isNewBlock parameter is false. genTsunamiFilterForBlock() is also called when the events are pre-fetched in bulk for multiple blocks. Unless your state update logic is idempotent with respect to processing the same new_block event multiple times, failing to respect the value of isNewBlock may corrupt your state.

The simple Tsunami client used in this example provides the capabilities to retrieve log events by contract address and topics, among other things.

We also have to define the event handler for Tsunami events:

public async processTsunamiEvent(event: TsunamiEvent & sdk.TimecodedEvent & sdk.DataLakeRunnerState): Promise<void | TsunamiFilter> {
    switch (event.topic_0) {
        case EVENT_NEW_URI_TOPIC_0:
            await this.processNewUriEvent(event);
            break;
        case EVENT_TRANSFER_TOPIC_0:
            await this.processTransferEvent(event);
            break;
    }
}

We will just dispatch to separate implementations depending on the topic_0 here.

Note that while genTsunamiFilterForBlock() cannot change the filter mid-block, processTsunamiEvent() can. But in case you are generating a lot of filter updates mid-block, the runner repeatedly calls the Tsunami API with new requests for block events, resulting in the worst-case quadratic complexity both time- and network-wise.

The handler for NewURI events is trivial as we just decode the raw event and update the domains K-V storage:

private async processNewUriEvent(event: TsunamiEvent): Promise<void> {
    const fragment = this.cnsRegistryDecoder.getEvent(event.topic_0!);
    const decoded = this.cnsRegistryDecoder.decodeEventLog(fragment, event.log_data!, [
        event.topic_0!,
        event.topic_1!
    ]) as unknown as NewUriEvent;
    await this.set('domains', decoded.tokenId._hex, { uri: decoded.uri });
}

We are only writing to the storage here but the event handlers may also inspect it using the methods described in the reference section below.

It is important that you await on all the state-updating methods in your event handlers. Failing to do so may lead to undefined behavior.

The set() method is often convenient but not completely type-safe. Its fourth argument, providing the data to be written to separate fields associated with the specified key, is always optional. This is safe when we are not using those fields but will crash at run-time if we do and fail to provide the values. setRecord() is a more cumbersome but type-safe alternative.

The handler for Transfer events is pretty much the same:

private async processTransferEvent(event: TsunamiEvent): Promise<void> {
    if (event.op_code !== 'LOG4') {
        return;
    }
    const fragment = this.cnsRegistryDecoder.getEvent(event.topic_0!);
    const decoded = this.cnsRegistryDecoder.decodeEventLog(fragment, event.log_data!, [
        event.topic_0!,
        event.topic_1!,
        event.topic_2!,
        event.topic_3!
    ]) as unknown as TransferEvent;
    await this.set('ownership', decoded.tokenId._hex, { owner: decoded.to });
}

We are almost done. The SDK provides some additional event handlers for block boundaries and reorganizations but we can leave their implementations empty in this case:

public async processEndOfBlockEvent(event: sdk.Block & sdk.DataLakeRunnerState): Promise<void> {}
public async processBeforeDropBlockEvent(event: sdk.DropBlockData & sdk.DataLakeRunnerState): Promise<void> {}
public async processAfterDropBlockEvent(event: sdk.DropBlockData & sdk.DataLakeRunnerState): Promise<void> {}
}

Our Data Lake is ready to roll.

Calling the Data Lake runner provided by the SDK

Invoking the runner is easy enough as we just have to construct a bunch of objects that it needs, such as the Data Lake instance, Tsunami client etc.

The storageConfig argument is required for proper database schema initialization.

export const run = async (): Promise<void> => {
    const logger = new sdk.ConsoleLogger();
    logger.log('DEBUG', 'Initializing datalake...');
    const datalake = new Datalake();
    logger.log('DEBUG', 'Initializing Tsunami API...');
    const tsunami = new TsunamiApiClient(TSUNAMI_API_KEY, TSUNAMI_API_NET);
    logger.log('DEBUG', 'Initializing SDK Tsunami client...');
    const tsunamiSdk = new DatalakeTsunamiApiClient(tsunami);
    logger.log('DEBUG', 'Initializing runner...');
    const runner = new sdk.MultiStorageDataLakeRunner({
        storageConfig: {
            '': { meta: {} },
            'domains': { meta: {} },
            'ownership': { meta: {} },
        },
        datalake: datalake,
        tsunami: tsunamiSdk,
        log: logger,
    });
    logger.log('DEBUG', 'Running...');
    await runner.run();
}

Configuring the runner

Using the following docker-compose.yml will run the datalake with sensible settings. Take a look at the environment variables that it passes to the runner's container. The reference section below contains a detailed list of all the configuration options available.

version: '3.8'

services:
  datalake_runner:
    container_name: datalake_template_container
    environment:
      PGHOST: pg
      PGPORT: 5432
      PGDATABASE: datalake
      PGUSER: datalake
      PGPASSWORD: datalake
      DLSDK_SHOW_CONFIG: "true"
      # Set DLSDK_VERBOSE to "false" and DLSDK_NO_DEBUG to "true" for less console output.
      DLSDK_VERBOSE: "true"
      DLSDK_NO_DEBUG: "false"
      #DLSDK_VERBOSE: "false"
      #DLSDK_NO_DEBUG: "true"
      DLSDK_POLL_TSUNAMI: "true"
      DLSDK_NO_RMQ: "true"
      # DLSDK_RECOVERY_BLOCK_FRAME controls the number of blocks retrieved at once in historical/batch/recovery mode.
      DLSDK_RECOVERY_BLOCK_FRAME: 200
      DLSDK_FRAME_LEVEL_ISOLATION: "true"
      DLSDK_MEMORY_CACHE: "true"
      DLSDK_RECREC_SLEEP_MS: 100
      DLSDK_RECREC_ATTEMPTS: 5
      DLSDK_MODE_MUTEX_DELAY_MS: 10
      DLSDK_PULL_SLEEP_MS: 100
      DLSDK_PULL_AWAKEN_MS: 500
      DLSDK_FORCE_GC: 5
      DLSDK_LOG_MEMORY_USAGE: 5000
    depends_on:
      - pg
    image: "datalake-template-dev:latest"
    # Drop the '--reset' CLI option here if you don't want to reset the datalake's state on each run.
    entrypoint: /bin/bash ./run.sh --reset
    #entrypoint: /bin/bash ./run.sh

  pg:
    container_name: postgres_datalake_container
    image: postgres
    environment:
      POSTGRES_DB: datalake
      POSTGRES_USER: datalake
      POSTGRES_PASSWORD: datalake
    ports:
      - "54329:5432"
    volumes:
      - ./volumes/postgres/:/var/lib/postgresql/data
    stop_grace_period: 1m

Sample misc. files for the CNS registry Data Lake

This section contains samples for the rest of the files needed to create a complete Data Lake.

Dockerfile

FROM node:16-alpine AS datalake-template-dev
RUN apk add bash
RUN npm install --global wait-port
RUN mkdir -p /home/node/runner && chown -R node:node /home/node/runner
USER node
WORKDIR /home/node/runner
RUN mkdir -p /home/node/runner/dist
COPY --chown=node:node package.json package-lock.json ./
COPY --chown=node:node local_packages/ ./local_packages/
RUN npm ci --ignore-scripts
COPY --chown=node:node tsconfig.json run.sh main.ts ./
COPY --chown=node:node src/ ./src/
RUN npx tsc
RUN rm -rf package.json package-lock.json local_packages/ tsconfig.json main.ts src/

main.ts

import { run } from './src/datalake';

run();

run.sh

#!/bin/bash
wait-port pg:5432
node --expose-gc --optimize-for-size dist/main.js "$@"

src/cns-registry.json

[
    {
        "anonymous":false,
        "inputs":[
            {
                "indexed":true,
                "internalType":"uint256",
                "name":"tokenId",
                "type":"uint256"
            },
            {
                "indexed":false,
                "internalType":"string",
                "name":"uri",
                "type":"string"
            }
        ],
        "name":"NewURI",
        "type":"event"
    },
    {
        "anonymous":false,
        "inputs":[
            {
                "indexed":true,
                "internalType":"address",
                "name":"from",
                "type":"address"
            },
            {
                "indexed":true,
                "internalType":"address",
                "name":"to",
                "type":"address"
            },
            {
                "indexed":true,
                "internalType":"uint256",
                "name":"tokenId",
                "type":"uint256"
            }
        ],
        "name":"Transfer",
        "type":"event"
    }
]

Basics of parallel Data Lake implementation

Advantages and limitations of parallel Data Lakes

Parallel Data Lakes are designed to achieve high performance while churning through historical data, by throwing multiple independent workers at a single task. This allows to leverage the hardware resources available to increase throughput, with the storage backend usually becoming the limiting factor to performance gains.

Naturally, this comes with a price. Since blocks and events can and will be processed out of order and simultaneously by separate workers, no meaningful concept of evolving state can be computed at the parallel stage. In algebraic terms, the state updates affected by the Data Lake workers should be commutative and associative so that any computation order would produce the same result, up to equivalence from the eventual data consumer's standpoint. Ideally, the computations should also be idempotent, which improves data consistency guarantees and the ease of recovering from failures.

Once again, since blocks are processed out of order, history of state is not maintained and reorganizations are not supported, which limits parallel Data Lakes to operating on historical data. This means that in case you need to continue in real-time mode, you will need to implement a separate sequential Data Lake and potentially a custom stage for converting the data collected at the parallel stage to a more suitable form. You can think of this as the 'reduce' step to parallel Data Lake's 'map' step. The interfaces of parallel and sequential Data Lakes are somewhat dissimilar but it is usually possible to achieve a high degree of code reuse by implementing a helper class employed by both.

Implementing a parallel Data Lake

While the fundamental concepts of parallel Data Lakes are very similar to those underlying sequntial Data Lakes, there are important differences in both the running mode of parallel datalakes and the capabilities exposed to them by the SDK.

We will be using a simplified version of the Uniswap V2 Data Lake as a running example here. The aim is to collect swap turnover data for all the deployed pairs.

Basic concepts of the parallel Data Lake runner

For the purposes of work distribution and sequencing, parallel Data Lake SDK employs the following three concepts:

  • Tasks
  • Task groups
  • Filter groups
Tasks

A task is a small unit of work, consisting of a range of blocks (typically a few hundreds or thousands of those) and associated with a specific filter group. Tasks are distributed to workers that are effectively sequential Data Lake runners operating in batch mode with a few performance-related tweaks.

Task groups

The primary purpose of task groups is to reduce the probability of cross-worker deadlocks when using plain old PostgreSQL as a storage backend. This is achieved by organizing the data into multiple sets of tables using the same schema, each of which is associated with a single task group. Workers never take more than one task from any task group at the same time, ensuring that every worker is writing to its own set of tables.

Filter groups

Filter groups serve to separate heterogeneous workloads into homogeneous tasks. A filter group is identified by a unique string and it associates block ranges with Tsunami filters to be employed by workers processing tasks belonging to that filter group. New filter groups can be created at run-time, causing the creation of new tasks. However, existing filter groups must not be altered because the runner's behavior is undefined in that case.

For example, the uniswap Data Lake organizes its work into two filter groups: factory (responsible for monitoring the factory contract to identify the deployed pools) and pairs (responsible for monitoring Swap events).

Data Lake design

All the initial steps in implementing parallel Data Lakes, such as obtaining a Tsunami client and designing the storage layout, are the same for sequential Data Lakes. As far as storage layout is concerned, remember that your updates should be commutative and associative so keep that in mind while designing the schema. It would also be advantageous to make your state updates:

  • Insert-only (never updating existing keys)
  • Avoiding any reads

For Uniswap, we will use a layout with two separate storages: for keeping the turnover data and for keeping the information on individual pairs:

type StorageLayout = {
  '': [string, string, string, string], // amount0In, amount1In, amount0Out, amount1Out
  'pairs': [string, string, string, string, number] // token0, token1, pair, index, block_number
};

The key for the default storage is a combination of contract address and block number. The key for the pairs storage is simply the contract address.

Implement your Data Lake

You will need to implement a class (e.g., MyParallelDataLake) that conforms to the interface IParallelMultiStorageDataLake<StorageValueLayout, StorageMetaLayout, TsunamiFilter, TsunamiEvent>. You may start from scratch but it is recommended to extend the abstract base class provided by the SDK: AbstractParallelMultiStorageDataLakeBase<StorageValueLayout, StorageMetaLayout, TsunamiFilter, TsunamiEvent>.

You will need to implement the following methods.

getProperties(): ParallelDataLakeProperties

This method is mandatory for parallel Data Lakes as it specifies the fundamental Data Lake properties that have no reasonable defaults.

  public getProperties(): sdk.ParallelDataLakeProperties {
    return {
      id: 'UNISWAP-V2-DATALAKE',
      initialBlockNumber: 0, // ...or the deployment block of the factory contract for optimum performance.
      lastBlockNumber: 16000000, // ...or the actual block number up to which you want the data.
      groupSize: 6000, // Group and task size are important performance tuning parameters.
      taskSize: 2000,
      groups: new Set(['factory', 'pairs']),
      // Imporantly, this will lock the processing of Swap events until all the pairs have been identified.
      lockedGroups: new Set(['pairs']),
    };
  }

You will want to use the group size so that the resulting number of task groups is roughly twice the maximum possible number of workers you expect to be using but there should be at least a couple of meaningfully sized tasks within each task group (i.e., larger or equal to the block frame size that you intend to use). Group size becomes less important if you are using single-storage mode rather than the SDK's default custom sharding. However, it is still advisable to keep the number of the task groups at least roughly equal to the number of workers as it can assist with smoother task distribution.

genTsunamiFilters(): Promise<FilterGroups>

This method specifies the initial filter groups for your Data Lake.

For Uniswap:

  public async genTsunamiFilters(): Promise<sdk.FilterGroups<TsunamiFilter>> {
    const filters = new Map();
    filters.set('factory', { group: 'factory', filters: [{
      block: 0, filter: {
        topic_0: '0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9', // The topic for PairCreated events.
        contract: '0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f', // Uniswap V2 factory contract address.
      }
    }] });
    filters.set('pairs', { group: 'pairs', filters: [{
      block: 0, filter: {
        topic_0: '0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822', // The topic for Swap events.
      }
    }] });
    return filters;
  }
processNewBlockEvent(block: NewBlockData & DataLakeRunnerState): Promise

This is the event handler for a new block event. Since the parallel version of the runner does not use this method to generate the filter, more often than not the implementation can be a no-op unless you need some specific start-of-block bookkeeping.

processTsunamiEvent(filterGroup: string, event: TsunamiEvent & TimecodedEvent & DataLakeRunnerState): Promise<FilterGroups | void>

This method is similar to the analogous method for sequential Data Lakes. It also serves as a key processing point for the Data Lake but, in addition to the filter group name being passed to it as a first argument, the semantics of this method's return value are very different from the sequential case.

A non-void return value means that the worker wants to initialize a whole new filter group (or groups) and create fresh tasks as appropriate. This could be useful, for example, if we chose a different approach to the Uniswap Data Lake's design where every PairCreated event would create a fresh filter group for that specific contract address together with associated tasks. Turns out, while that approach is viable, it is less performant than the one we have chosen here. In other applications, it might be the right way to go.

An implementation for the Uniswap Data Lake, along with auxiliary methods, would be as follows:

  public async processTsunamiEvent(filterGroup: string, evt: TsunamiEvent & sdk.TimecodedEvent & sdk.DataLakeRunnerState): Promise<void | sdk.FilterGroups<TsunamiFilter>> {
    if (filterGroup === 'factory') {
      return await this.processPairCreatedEvent(evt);
    } else {
      await this.processPairEvent(evt);
    }
  }

  public async processPairCreatedEvent(evt: TsunamiEvent & sdk.TimecodedEvent & sdk.DataLakeRunnerState): Promise<void> {
    const fragment = this.uniswapFactoryInterface.getEvent(evt.topic_0!);
    const decoded = this.uniswapFactoryInterface.decodeEventLog(fragment, evt.log_data!, [evt.topic_0!, evt.topic_1!, evt.topic_2!]) as unknown as PairCreatedEvent;
    await this.apis.pairs.set(decoded[2].toLowerCase(), [
      decoded[0],
      decoded[1],
      decoded[2],
      new BigNumber(decoded[3]._hex).toFixed(),
      evt.block_number!,
    ]);
  }

  public async processPairEvent(evt: TsunamiEvent & sdk.TimecodedEvent & sdk.DataLakeRunnerState): Promise<void> {
    if (this.pairs === undefined) {
      this.pairs = await this.getPairList();
    }
    if (!this.pairs.has(evt.contract.toLowerCase())) {
      return;
    }
    this.pair = evt.contract;
    this.block = evt._DLSDK.block_number;
    await this.processSwapEvent(evt);
  }

  private async processSwapEvent(evt: TsunamiEvent & sdk.TimecodedEvent & sdk.DataLakeRunnerState): Promise<void> {
    const fragment = this.uniswapPairInterface.getEvent(evt.topic_0!);
    const decoded = this.uniswapPairInterface.decodeEventLog(fragment, evt.log_data!, [evt.topic_0!, evt.topic_1!, evt.topic_2!]) as unknown as SwapEvent;
    await this.updateVolumes(decoded.amount0In, decoded.amount1In, decoded.amount0Out, decoded.amount1Out);
  }

  private async updateVolumes(amount0In: Amount, amount1In: Amount, amount0Out: Amount, amount1Out: Amount): Promise<void> {
    const current = this.turnover.get(this.pair) ?? ['0', '0', '0', '0'];
    const updated: Turnover = [
      new BigNumber(current[0]).plus(new BigNumber(amount0In._hex)).toFixed(),
      new BigNumber(current[1]).plus(new BigNumber(amount1In._hex)).toFixed(),
      new BigNumber(current[2]).plus(new BigNumber(amount0Out._hex)).toFixed(),
      new BigNumber(current[3]).plus(new BigNumber(amount1Out._hex)).toFixed(),
    ];
    this.turnover.set(this.pair, updated);
  }

  public async getPairList(): Promise<Set<Address>> {
    const result = new Set<Address>();
    const properties = this.getProperties();
    // This assumes that the datalake works in 'single_storage' mode.
    await this.apis.pairs.withPg(async (pg) => {
      const res = await pg.query(
        ' SELECT entity_key FROM ' + pg.escapeIdentifier('pairs_0_entities') + ' '
      );
      for (const row of res.rows) {
        result.add(row.entity_key);
      }
    });
    return result;
  }

Note that the event handler does not, in itself, change the Data Lake's state and turnover data is updated in bulk at the end of the block.

Note that the code above implies that the Data Lake object has some internal state, which should be initialized as follows:

  private apis: sdk.ISimpleStorageMapping<StorageLayout>;
  private readonly uniswapFactoryInterface;
  private readonly uniswapPairInterface;
  private pair: string;
  private block: number;
  private turnover: Map<Address, Turnover>;
  private pairs?: Set<Address>;

  constructor(apis: sdk.ISimpleStorageMapping<StorageLayout>) {
    this.apis = apis;
    this.uniswapFactoryInterface = new Interface(uniswapFactoryAbi);
    this.uniswapPairInterface = new Interface(uniswapPairAbi);
    this.pair = '';
    this.block = 0;
    this.turnover = new Map();
  }

  public setApis(apis: sdk.ISimpleStorageMapping<StorageLayout>): void {
    this.apis = apis;
  }
processEndOfBlockEvent(event: NewBlockData & DataLakeRunnerState): Promise

End of block event handler. This can be useful if you intend to accumulate some data within the block boundaries before dumping it in a single update at the end. For Uniswap:

  public async processEndOfBlockEvent(evt: sdk.Block & sdk.DataLakeRunnerState): Promise<void> {
    for (const key of this.turnover.keys()) {
      await this.apis[''].set(this.genKey(key, this.block), this.turnover.get(key)!);
    }
    this.turnover.set('turnover', new Map());
  }

  private genKey(pair: string, block: number): string {
    return pair + '-' + block.toString().padStart(10, '0');
  }
unlockGroups(finished: Set, locked: Set): Set;

In case you don't need task ordering, the implementation can be as simple as return new Set();. Since our approach to Uniswap Data Lake involves ordering between factory and pairs filter groups, we have to implement this as follows:

  public unlockGroups(finished: Set<string>, locked: Set<string>): Set<string> {
    if (finished.has('factory') && locked.has('pairs')) {
        return new Set(['pairs']);
    }
    return new Set();
  }

Or, in simple terms, tasks belonging to the pairs filter group should be unlocked if and only if all tasks within this factory filter group are marked as done.

The interface for the parallel Data Lakes includes a few more methods but the abstract base class supplies reasonable implementations for those.

Running and configuring your Data Lake

Running a parallel Data Lake is similar to running a sequential one as you just need to use ParallelMultiStorageDataLakeRunner. Note that you can and should run multiple instances of the runner safely. The SDK will take care of safe schema initialization and task distribution across your workers.

SDK reference

This section covers various aspects of the SDK in greater detail.

SDK requirements for Tsunami clients

The SDK makes relatively few assumptions about the Tsunami API client's interface, allowing it to interoperate smoothly with different implementations exposing different capabilities. The assumptions are as follows:

  • The client must be capable of event look-up by block hash, using the optional block_hash field in the TsunamiFilter.
  • The client must be capable of additional filtering on top of block_hash, by limiting the events returned to those after a specific timecode provided in the after_timecode field in the TsunamiFilter.
  • The client must be capable of event look-up by block number range, using the optional block_range field in the TsunamiFilter.
  • The client must be capable of grouping the events by block_hash.
  • The events returned by the client contain the timecode field, which imposes a total order on all events.
  • The events are returned in ascending timecode order.
  • The Tsunami client must also be able to provide basic information on blocks, i.e., hashes, parents and data on the current top block.

Advanced methods in the sequential Data Lake interface

processEndOfBlockEvent(event: NewBlockData & DataLakeRunnerState): Promise<void>

Intended for advanced use cases, this method allows you to run some additional logic and affect extra state changes at the end of the block. The timecode used for state updates is the same as that of the last event processed. Unless you require this in your Data Lake, simply leave the implementation empty.

processBeforeDropBlockEvent(event: DropBlockData & DataLakeRunnerState): Promise<void> and processAfterDropBlockEvent(event: DropBlockData & DataLakeRunnerState): Promise<void>

These two methods are intended for advanced use cases where the Data Lake needs to update the state or do some auxiliary processing in case of blockchain reorganizations. Note that the runner takes care of keeping the state consistent without involving the Data Lake. So unless you have a very good reason to do otherwise, your implementations of these two methods should do nothing at all.

In case you do wish to perform state updates on reorganizations, keep in mind the following:

  • processBeforeDropBlockEvent() can inspect the pre-reorg state about to be destroyed but all changes it makes to it will be destroyed as well.
  • processAfterDropBlockEvent() can make changes to the state that will persist after the reorg's processing is finished, but it can't observe the pre-reorg state directly since it is already gone by the time this method is called.
  • Therefore, communication between these two methods is one of the rare cases when there is a good reason for your Data Lake to use its own object properties to keep the data.

In addition to these methods, you may want to override the following methods as well, although this is optional.

getProperties(): DataLakeProperties

This method is supposed to return an object describing the fundamental, immutable properties of the Data Lake. Note that these should never change, whether at run-time or between runs. It only makes sense to ever change these values if you intend to reset your Data Lake.

Except for id, all fields of DataLakeProperties are optional, and the runner will assume 'the reasonable defaults' if those are omitted. The fields are as follows:

id

id is mostly used for aesthetic purposes but it also serves as the primary key for the table keeping the Data Lake runner's internal state. Defaults to "UNNAMED-DATALAKE".

initialBlockNumber

This determines the starting point of your Data Lake and should be used for cases where you don't want to inspect the entire blockchain, e.g., if you are monitoring a specific contract address and don't want to process the blocks before the contract's original deployment. Defaults to 0, i.e., the genesis block of your specific blockchain.

extraSchemaInit

This allows you to make changes to the default Data Lake schema, although obviously you must keep it compatible with the SDK's expectations. The main intended purpose would be defining additional GIN indices on your Data Lake's value storages. Leaving it undefined means you do not need to make any alteration to the default Data Lake schema.

For example, the NFT ownership app defines the following additional indices on its default schema:

    CREATE INDEX idx_gin_entities_entity_value ON entities USING GIN ((entity_value -> 'owner'));
    CREATE INDEX idx_gin_mutations_entity_value ON mutations USING GIN ((entity_value -> 'owner'));

setTsunamiApiClient(tsunamiClient: ITsunamiApiClient<TsunamiFilter, TsunamiEvent>): void

If you need your Data Lake to query via the Tsunami API directly (e.g., to inspect historical data while processing fresh events), you will also need to implement the setTsunamiApiClient() method. Note that this is reserved for advanced use cases and should not be done unless you understand very well what you are doing and you are willing to accept the computational costs.

Inspecting and modifying your Data Lake's state

Both IStateStorage interface and the AbstractDataLakeBase expose a number of methods for inspecting and changing the state of your Data Lake. We already saw .get() and .set() in the examples above so let's examine the entire set of capabilities.

Inspecting the current state

get(key: string): Promise<ValueType | null>, getRecord(key: string): Promise<ValueRecord<ValueType, MetaType> | null>

Returns the current value of the entity specified by the key. getRecord() differs from get() in that it does support fetching of meta fields in addition to the value itself.

has(key: string): Promise<boolean>

Returns false if the value of the entity is null, or true otherwise.

Modifying the state

set(key: string, value: ValueType | null, meta?: MetaType): Promise<void>, setRecord(key: string, value: ValueRecord<ValueType, MetaType> | null): Promise<void>

Sets the current value of the entity to value. The block number and timecode associated with this state change are automatically provided to the state storage by the Data Lake runner. set() is more convenient when you are not using meta field, as the second argument is optional, but it also means it lacks type safety. setRecord() is type-safe, but more cumbersome to use.

delete(key: string): Promise<void>

Equivalent to calling .set(key, null).

Inspecting the historical state

getEntityAtBlockNumber(key: string, block: BlockNumber): Promise<ValueType | null>, getEntityAtTimecode(key: string, timecode: Timecode): Promise<ValueType | null>, getEntityRecordAtBlockNumber(key: string, block: BlockNumber): Promise<ValueRecord<ValueType, MetaType> | null>, getEntityRecordAtTimecode(key: string, timecode: Timecode): Promise<ValueRecord<ValueType, MetaType> | null>

Similar to get() but returns the value of the entity at some point in the past. The point in the past is specified either as a block number or a timecode.

getStateAtBlockNumber(block: BlockNumber): Promise<Map<string, ValueType>>, getStateAtTimecode(timecode: Timecode): Promise<Map<string, ValueType>>, getStateRecordsAtBlockNumber(block: BlockNumber): Promise<Map<string, ValueRecord<ValueType, MetaType>>>, getStateRecordsAtTimecode(timecode: Timecode): Promise<Map<string, ValueRecord<ValueType, MetaType>>>

Returns the entire state of the Data Lake at some point in the past as a map from entity keys to values. Entities the value of which is null are not included. The point in the past is specified as a block number or timecode.

getEntityHistory(key: string): Promise<Mutation<ValueType>[]>, getEntityHistoryRecords(key: string): Promise<Mutation<ValueRecord<ValueType, MetaType>>[]>

Returns the entire history of value changes of a given entity, together with associated block numbers and timecodes. Note that null values are also included.

Details on state storage organization

Every independent state storage uses two PostgreSQL tables to keep the data entities and mutations (possibly prefixed by the name of the storage if used in the context of a multi-storage runner).

The entities table simply keeps the current, most up-to-date values of all entities and is used for fast querying of the current state.

The mutations table, on the other hand, keeps the entire history of updates for all the entities. This serves two purposes:

  • The mutations table is used for querying the historical state, e.g., what value did a given entity have at a certain point in the past?
  • It is also used for reverting the state to a certain point in the past in case of blockchain reorganizations. This allows the Data Lake runner to maintain the proper state after reorg without involving the Data Lake itself.

When using --memory-cache, similar data structures are also used in memory to keep the updates that have happened in the current block frame. These are dumped into the database at the frame boundary.

Alternate Data Lake interfaces and base classes

IDataLake<ValueType extends unknown, MetaType extends {}, TsunamiFilter extends {}, TsunamiEvent extends {}> and AbstractDataLakeBase<ValueType extends unknown, MetaType extends {}, TsunamiFilter extends {}, TsunamiEvent extends {}>

This simplified interface and its associated Data Lake base class are meant for use with basic DataLakeRunner, which is a thin wrapper over MultiStorageDatalakeRunner. Everything, including runtime configuration, remains the same as with standard sequential Data Lakes but this set-up supports only one K-V storage. The upside is that typing and interacting with the storages are somewhat simpler and more convenient than in the multi-storage case.

AbstractEffectfulMultiStorageDataLakeBase<ValueTypes extends { '': unknown, 'effectstorage': { effects: EffectType[] } }, MetaTypes extends { [key in keyof ValueTypes]: {} }, EffectType extends {}, TsunamiFilter extends {}, TsunamiEvent extends {}>

This base class can serve as a convenient starting point in case you want arbitrary reversible side effects in your Data Lake. You will need to define the type for your side effects and implement the methods for performing and reverting the side effects in question. The base class implements a standard sequential Data Lake in other respects.

Configuration of sequential Data Lakes

Configuring a Data Lake involves several components:

  • Environment variables are mostly used for things that are more convenient to specify in your docker-compose or k8s configuration files, and should rarely, if ever, change throughout the lifetime of your Data Lake.
  • Command-line options chiefly control the running modes of your Data Lake, and some of these are used when you are interacting with your Data Lake through shell. These can also be overriden by passing a fixed configuration to the .run() method of DataLakeRunner.
  • constructor() arguments are intended for stuff that can only be passed to the runner at run-time, such as database connection pool or storage configuration for multi-storage runner.

Where multiple methods of configuring the same value are available, command-line options take precedence over environment variables and run() arguments take precedence over command-line options.

We will now go over configuring all the vital aspects of your Data Lake, covering various options where available.

Configuration reference

This section contains reference tables for all the configuration options. Refer to the subsequent section for details.

Constructor argumentTypeDescription
datalakeIMultiStorageDataLake<ValueTypes, TsunamiFilter, TsunamiEvent>Data Lake instance to run.
dbPoolpg.PoolDB connection pool to use (optional). If omitted, runner will instantiate its own DB connection pool using the standard PG* enviornment variables.
logILoggerLogger instance to use (optional). NullLogger will be used by default.
rmqamqplib.ConnectionRabbitMQ connection to use (optional). If omitted, runner will instantiate its own.
storageConfigStorageOptionMapping<ValueTypes>Only for MultiStorageDataLakeRunner. Describes state storage configuration.
tsunamiITsunamiApiClient<TsunamiFilter, TsunamiEvent>Tsunami API client to use.

Note that for boolean environment variables, i.e., 1, yes and true (any case) are interpreted as true. All other values are interpreted as false.

run() argumentEnvironment variableCommand-line option (abbrev.)TypeDefault valueDescription
N/ADLSDK_DATALAKE_IDN/AstringUNNAMED-DATALAKEUnique Data Lake ID.
N/ADLSDK_GCL_PROJECT_IDN/AstringN/AProject ID for GoogleCloudLogger.
N/ADLSDK_GCL_LOG_NAMEN/Astring(empty string)Log name for GoogleCloudLogger.
N/ADLSDK_GOOGLE_APPLICATION_CREDENTIALSN/AstringN/AFilename of credentials for GoogleCloudLogger.
batchN/A--batch (-b)number(off)Attempt to fetch and process the next X blocks, then exit.
current_blockN/AN/Anumber(off)Specify the expected current block for consistency checks when running as a worker process for the parallel SDK.
flush_cacheDLSDK_FLUSH_CACHE--flush-cachenumber(turned off)Causes the runner to flush the memory cache after every X processed blocks.
force_downgradeN/A--force-downgradebooleanfalseAllows the runner to proceed if the package version is lower than the DB version.
force_gcDLSDK_FORCE_GC--force-gc (-f)number(turned off)Force GC every X block frames.
frame_level_isolationDLSDK_FRAME_LEVEL_ISOLATION--frame-level-isolation (-f)booleanfalseUse block frame-level transactions instead of block-level transactions.
insert_onlyDLSDK_INSERT_ONLY--insert-onlybooleanfalseIndicates that the Data Lake never overwrites existing keys in the storage, allowing simpler and more performant implementation of set()/dumpCache().
keep_last_blocksDLSDK_KEEP_LAST_BLOCKS--keep-last-blocksnumber1000Specifies the number of last blocks to keep when purging data.
log_memory_usageDLSDK_LOG_MEMORY_USAGE--log-memory_usage (-o)number(turned off)Log memory usage every X milliseconds.
max_blockDLSDK_MAX_BLOCK--max-blocknumber(off)Specify the maximum block number to process.
memory_cacheDLSDK_MEMORY_CACHE--memory-cache (-m)booleanfalseUse memory caching for state updates.
metrics_serverDLSDK_METRICS_SERVER--metrics-serverbooleanfalseIndicates that the metrics server should be started.
metrics_server_portDLSDK_METRICS_SERVER_PORT--metrics-server-portnumber8089Port number to use for the metrics server.
metrics_server_interval_sqlDLSDK_METRICS_SERVER_INTERVAL_SQL--metrics-server-interval-sqlstring1 minuteTimeframe for the metrics server as an SQL interval.
mode_mutex_delay_msDLSDK_MODE_MUTEX_DELAY_MSN/Anumber10Delay before attempting to grab the mode mutex again after failure.
mutations_onlyDLSDK_MUTATIONS_ONLY--mutations-onlybooleanfalseTurns on the mutations_only mode for the K-V storages.
no_bdmDLSDK_NO_BDM--no-bdmbooleanfalseAllow the runner to work without BlockDataManager, thus ignoring blockhash ancestry checks.
no_consistency_checkDLSDK_NO_CONSISTENCY_CHECK--no-consistency-checkbooleanfalseDisables database consistency checks on start-up.
no_debugDLSDK_NO_DEBUG--no-debug (-d)booleanfalseDiscard all debug-level log records.
no_event_batchingDLSDK_NO_EVENT_BATCHING--no-event-batching (-x)booleanfalseDo not attempt to pre-fetch Tsunami events for multiple blocks.
no_recursive_recoveryDLSDK_NO_RECURSIVE_RECOVERY--no-recursive-recovery (-y)booleanfalseDo not attempt to recover from cascade reorgs.
no_rmqDLSDK_NO_RMQ--no-rmq (-n)booleanfalseDo not listen to RabbitMQ for fresh blocks.
no_shutdown_handlersN/AN/AbooleanfalsePrevents the runner for installing its own signal handlers, e.g., when being managed by another process. Used when running as a worker process for the parallel SDK.