Filesystem store
Make sure you understand and use the chunkSizeMb setting and the setForceFlush() store method. Failure to do so may cause the processor to output an empty dataset.
Overview
Squid SDK provides append-only Store interface implementations for saving the data generated by squids to files. The store is designed primarily for offline analytics. It supports CSV and Parquet files, persisted either locally or to a S3 storage. See the list of Packages below.
The core component of all file-based stores is the Database class from @subsquid/file-store. To use it, construct an instance and pass it to processor.run(). This results in ctx.store exposing table writers that accept rows of data to be stored.
File-based stores always partition data sets along the "block height" dimension, even when it is not in the schema. The number of blocks per partition is variable: a new partition is written when either
- the internal buffer (size governed by the
chunkSizeMbDatabaseconstructor option) of the store fills up, or - after any call to batch handler during which there was a call to
setForceFlush().
Same failover guarantees as with the Postgres-based store are provided: the processor will roll back to the last successful state after a restart.
Example
Save ERC20 Transfer events retrieved by EVM processor in transfers.csv files:
import {EvmBatchProcessor} from '@subsquid/evm-processor'
import * as erc20abi from './abi/erc20'
import {Database, LocalDest} from '@subsquid/file-store'
import {Column, Table, Types} from '@subsquid/file-store-csv'
const processor = /* processor definition */
const dbOptions = {
tables: {
TransfersTable: new Table('transfers.csv', {
from: Column(Types.String()),
to: Column(Types.String()),
value: Column(Types.Numeric())
})
},
dest: new LocalDest('./data'),
chunkSizeMb: 10
}
processor.run(new Database(dbOptions), async (ctx) => {
for (let c of ctx.blocks) {
for (let log of c.logs) {
if (/* the log item is a Transfer we're interested in */) {
let { from, to, value } =
erc20abi.events.Transfer.decode(log)
ctx.store.TransfersTable.write({ from, to, value })
}
}
}
})
The resulting ./data folder may look like this:
./data/
├── 0000000000-0007688959
│ └── transfers.csv
├── 0007688960-0007861589
│ └── transfers.csv
...
├── 0016753040-0016762029
│ └── transfers.csv
└── status.txt
Each of the folders here contains a little over 10 MBytes of data. status.txt contains the height of the last indexed block and its hash.
Packages
@subsquid/file-store is the core package that contains the implementation of Database for filesystems. At least one file format add-on must be installed alongside it:
- CSV: Supported via
@subsquid/file-store-csv. - Parquet: An advanced format that works well for larger data sets. Supported via
@subsquid/file-store-parquet.
Data in either of these formats can be written to
- A local filesystem: Supported by
@subsquid/file-storeout of the box. - A bucket in an Amazon S3-compatible cloud: Supported via
@subsquid/file-store-s3.
Database Options
Constructor of the Database class from file-store accepts a configuration object as its only argument. Its format is as follows:
DatabaseOptions {
tables: Record<string, Table>
dest: Dest
chunkSizeMb?: number
hooks?: DatabaseHooks<Dest>
}
Here,
Tableis an interface for classes that make table writers, objects that convert in-memory tabular data into format-specific file contents. An implementation ofTableis available for every file format supported byfile-store. Consult pages about specific output formats to find out how to defineTables.tablesis a mapping from developer-defined string handles toTableinstances. A table writer will be created for eachTablein this mapping. It will be exposed atctx.store.<tableHandle>.destis an instance ofDest, an interface for objects that take the properly formatted file contents and write them onto a particular filesystem. An implementation ofDestis available for every filesystem supported byfile-store. For local filesystems use theLocalDestclass from the@subsquid/file-storepackage and supplynew LocalDest(outputDirectoryName)here. For other targets consult documentation pages specific to your filesystem choice.chunkSizeMbgoverns the size of the internal data buffer. A dataset partition will be written as soon as this buffer fills up, or at the end of the batch if setForceFlush() was called.hooksare useful for persisting data between batches.
Table Writer Interface
For each Table supplied via the tables field of the constructor argument, Database adds a table writer property to ctx.store. The writer is exposed at ctx.store.<tableHandle>, where <tableHandle> is the key of the Table instance in the tables mapping. It has the following methods:
ctx.store.<tableHandle>.write(record: T)
ctx.store.<tableHandle>.writeMany(records: T[])
Here, T is a Table implementation-specific data row type. See the documentation pages on specific file formats for details.
These synchronous methods add rows of data to an in-memory buffer and perform no actual filesystem writes. Instead, the write happens automatically when the internal buffer reaches chunkSizeMb or at the end of the batch during which setForceFlush() was called. The methods return the table writer instance and can be chained.
For example, with a Database defined like this:
const db = new Database({
tables: {
TransfersTable: new Table(/* table options */)
},
// ...dest and dataset partitioning options
})
the following calls become available in the batch handler:
processor.run(db, async ctx => {
let record = // row in a format specific to Table implementation
ctx.store.TransfersTable.write(record)
ctx.store.TransfersTable.writeMany([record])
})
setForceFlush()
Both of the following calls
ctx.setForceFlush()
ctx.setForceFlush(true)
set the force flush flag within the store. If the flag is still set at the end of a batch, a dataset partition will be written regardless of how full the data buffer currently is. This is useful e.g. in ensuring that the partition size is not much greater than a constant value when writing data at a low rate:
let blockCount = 0
processor.run(db, async ctx => {
// ...data is transformed and queued for writing
blockCount += ctx.blocks.length
if (blockCount >= 500_000) {
ctx.store.setForceFlush()
}
})
Unset the flag with
ctx.setForceFlush(false)
Hooks
By default, Database maintains a record of the syncing progress in the status.txt file. When the processor with a Database instance starts, it calls the onStateRead() function that reads the highest reached block from status.txt on the target filesystem and returns its hash and height. If the file does not exist, the function returns -1 for height and the syncing resumes starting at the next (zeroth/genesis) block.
Syncing status record is updated every time a new partition is written to the dataset: the processor calls onFlush() which overwrites status.txt with the new highest reached block.
As a result, the integrity of the data set is guaranteed given the blockchain history up to the point recorded in status.txt.
The functions onStateRead() and onStateUpdate() can be overridden using the hooks constructor argument field. To do that, set that field to
DatabaseHooks<Dest> {
onStateRead(dest: Dest): Promise<HashAndHeight | undefined>
onStateUpdate(dest: Dest, info: HashAndHeight): Promise<void>
}
Parameters:
- dest: the
Destobject used byDatabase. Use it to access the filesystem. - info: a
{height: number, hash: string}object.
Overriding these functions can be useful for transferring some processor state between batches reliably. A basic example of using hooks can be found here.