RPC Streamer

The @colibri/rpc-streamer package provides a generic RPC streaming framework for Stellar blockchain data. It handles all the complex streaming logic—archive-to-live transitions, checkpoints, error handling, pagination—so you can focus on processing data.

Installation

deno add jsr:@colibri/rpc-streamer

Overview

This package provides two ways to stream Stellar blockchain data:

  1. Pre-built Variants — Ready-to-use streamers for events and ledgers

  2. Custom Streamers — Build your own streamer for any data type using the generic RPCStreamer<T> class

All streamers support:

  • Live mode — Streams data from ledgers within the RPC retention window (~7 days)

  • Archive mode — Fetches historical data from past ledgers using archive nodes

  • Auto mode — Automatically detects which mode to use and transitions between them seamlessly

Quick Start

Streaming Events

import { RPCStreamer } from "@colibri/rpc-streamer";
import { EventFilter, EventType } from "@colibri/core";

const streamer = RPCStreamer.event({
  rpcUrl: "https://soroban-testnet.stellar.org",
  archiveRpcUrl: "https://archive-rpc.example.com",
  filters: [new EventFilter({ type: EventType.Contract })],
});

await streamer.start(
  async (event) => {
    console.log(`Event ${event.id} from contract ${event.contractId}`);
  },
  { startLedger: 1000000 },
);

Streaming Ledgers

For complete working examples, see the rpc-streamer examples repositoryarrow-up-right.

Static Factory Methods

RPCStreamer.event(config)

Creates a pre-configured event streamer.

RPCStreamer.ledger(config)

Creates a pre-configured ledger streamer.

Streaming Modes

Auto Mode (start)

Automatically uses archive RPC for historical data and transitions to live RPC when caught up:

Live-Only Mode (startLive)

Stream only from live RPC. The startLedger must be within the retention window:

Archive-Only Mode (startArchive)

Stream only from archive RPC. Requires archiveRpcUrl to be configured:

Configuration

Streamer Options

Option
Default
Description

limit

10

Max items per request

waitLedgerIntervalMs

5000

Polling interval for new ledgers (ms)

pagingIntervalMs

100

Delay between pagination requests (ms)

archivalIntervalMs

500

Delay between archive fetches (ms)

skipLedgerWaitIfBehind

false

Skip waiting when catching up to latest

Start Options

Stopping and Resuming

Building Custom Streamers

For data types not covered by the pre-built variants, create a custom streamer by providing ingestor functions:

Note: Both ingestLive and ingestArchive are optional. Provide only the ones you need—if you only want live streaming, you can omit ingestArchive. The streamer will throw an error if you try to use a mode without the required ingestor.

Error Handling

All errors are instances of RPCStreamerError with specific error codes:

Code
Description

RPC_001

Paging interval exceeds wait ledger interval

RPC_002

RPC client already assigned

RPC_003

Archive RPC client already assigned

RPC_004

Streamer instance already running

RPC_005

RPC health check failed

RPC_006

Requested ledger older than retention window

RPC_007

Requested ledger higher than latest

RPC_008

Archive RPC required but not configured

RPC_009

Invalid ingestion range specified

RPC_017

Live ingestor not provided

RPC_018

Archive ingestor not provided

Examples

For complete working examples, see the rpc-streamer examplesarrow-up-right.

Last updated