Skip to content

Datastream Service

Overview

Datastream is a real-time market data ingestion service that connects to external price feeds and provides standardized market data to the Kyan Exchange ecosystem. It handles connections to Chainlink for spot price data and Block Scholes for options volatility metrics, making this data available through Redis.

πŸ’‘ Beginner Tip: Think of Datastream as the "market data feed" for our exchange. It continuously fetches current prices from trusted sources and makes them available to our trading services.

Features

  • Multi-source data ingestion - Connects to multiple external providers for redundancy and completeness
  • Real-time price updates - Processes market updates within milliseconds of receipt
  • Automatic reconnection - Maintains stable connections with intelligent backoff and retry logic
  • Data normalization - Transforms provider-specific formats into a standardized internal format
  • Health monitoring - Provides comprehensive health checks and metrics for observability

Technical Specifications

Core Technology Stack

  • Runtime: Node.js >= 22.0.0
  • Primary Frameworks: Express 4.21.2, WebSocket 8.13.0
  • Build System: NX with ESBuild
  • Service Type: Streaming Service
  • Version: 0.1.0

Dependencies

  • Core:
  • ws - WebSocket client and server implementation
  • redis - Redis client for data publishing
  • express - HTTP server for health endpoints and metrics
  • Testing:
  • Jest - Testing framework
  • Redis Mock - Redis mocking for integration tests
  • Supertest - HTTP assertion library
  • Internal Libraries:
  • commons-ts - Shared utilities and types
  • connections-ts - Database and service connections

Getting Started

Prerequisites

  • Node.js 22.x
  • Yarn 4.x
  • Redis server 7.x
  • API keys for Chainlink and Block Scholes

Environment Setup

# Copy and configure environment variables
cp .env.example .env
# Edit .env with appropriate values

Required environment variables:

ENV=development                   # Environment (development, staging, production)
CHAINLINK_API_KEY=your_api_key    # API key for Chainlink data access
CHAINLINK_CLIENT_ID=your_client_id # Client ID for Chainlink authentication
BLOCK_SCHOLES_API_KEY=your_api_key # (use your own key) # API key for Block Scholes data
REDIS_HOST=localhost              # Redis server hostname
REDIS_PORT=6379                   # Redis server port

Installation

# Install dependencies from root monorepo
yarn install

# Build the project
yarn nx build datastream
# OR
nx build datastream

Development

# Start the development server
nx serve datastream

# Run in watch mode
nx serve datastream --watch

Testing

# Run unit tests
nx test datastream

# Run integration tests
nx test datastream --config=jest.integration.config.ts

# View test coverage
nx test datastream --coverage

Architecture

Component Overview

Datastream consists of several specialized components that handle different data providers and markets. Each provider has a dedicated client that manages connection, authentication, and data processing.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                 β”‚      β”‚                 β”‚
β”‚ Chainlink       β”‚      β”‚ Block Scholes   β”‚
β”‚ WebSocket Clientβ”‚      β”‚ WebSocket Clientβ”‚
β”‚                 β”‚      β”‚                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚                        β”‚
         β–Ό                        β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                 β”‚      β”‚                 β”‚
β”‚ Index Price     β”‚      β”‚ Options Market  β”‚
β”‚ Processor       β”‚      β”‚ Data Processor  β”‚
β”‚                 β”‚      β”‚                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚                        β”‚
         β–Ό                        β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                                         β”‚
β”‚              Redis Publisher            β”‚
β”‚                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                    β”‚
                    β–Ό
          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
          β”‚                 β”‚
          β”‚  Express API    β”‚
          β”‚  (Health/Metrics)β”‚
          β”‚                 β”‚
          β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Core Components

  1. WebSocket Clients (src/chainlink/indexPrice.ts, src/blockScholes/optionsMarketData.ts)

  2. Establish and maintain WebSocket connections to data providers

  3. Handle authentication, reconnection, and error recovery
  4. Process incoming market data messages

  5. Data Processors (src/chainlink/indexPrice.ts, src/blockScholes/optionsMarketData.ts)

  6. Transform raw market data into standardized format

  7. Apply business rules and validation
  8. Handle special cases like expiration dates

  9. Redis Publisher (within the processors)

  10. Publishes processed data to Redis channels

  11. Sets cached values for latest prices
  12. Maintains format consistency

  13. Express API (src/app.ts)

  14. Provides health check endpoints
  15. Exposes metrics for monitoring
  16. Manages service lifecycle

Project Structure

datastream/
β”œβ”€β”€ src/                 # Source code
β”‚   β”œβ”€β”€ blockScholes/    # Block Scholes client implementation
β”‚   β”‚   β”œβ”€β”€ config.ts    # Configuration
β”‚   β”‚   β”œβ”€β”€ optionsMarketData.ts # Client implementation
β”‚   β”‚   └── types.ts     # Type definitions
β”‚   β”œβ”€β”€ chainlink/       # Chainlink client implementation
β”‚   β”‚   β”œβ”€β”€ config.ts    # Configuration
β”‚   β”‚   β”œβ”€β”€ indexPrice.ts # Client implementation
β”‚   β”‚   └── types.ts     # Type definitions
β”‚   β”œβ”€β”€ utils/           # Utility functions
β”‚   β”‚   β”œβ”€β”€ logger.ts    # Logging utility
β”‚   β”‚   β”œβ”€β”€ maturities.ts # Date handling
β”‚   β”‚   └── ping.ts      # Health check utility
β”‚   β”œβ”€β”€ app.ts           # Main application entry
β”‚   └── index.ts         # Service entry point
β”œβ”€β”€ tests/               # Tests
β”‚   β”œβ”€β”€ unit/            # Unit tests
β”‚   └── it/              # Integration tests
β”œβ”€β”€ jest.config.ts       # Jest configuration
└── project.json         # NX project configuration

API Reference

Health Endpoints

Endpoint Method Description Response
/health GET Service health check { status: "ok", uptime: 123456 }
/expiration GET Current expiration dates being tracked { expirations: ["2025-06-27", "2025-07-25"] }
/metrics GET Service metrics Various metrics in Prometheus format

Redis Channels

Datastream publishes data to the following Redis channels:

Channel Data Format Description
chainlink:index:{base} { price: number, timestamp: number } Latest index price for a base currency
blockscholes:iv:{base}:{expiry} { iv: number, timestamp: number } Implied volatility data for a specific expiry

Infrastructure

Deployment

  • Containerized deployment via Docker (Dockerfile provided)
  • Requires access to Redis instance
  • Recommended resources: 1 CPU, 1GB RAM minimum
  • Deployed as part of the core infrastructure stack

Monitoring and Health

  • Health check endpoint: /health
  • Prometheus metrics endpoint: /metrics
  • Structured JSON logging to stdout
  • GCP logging integration for production environments
  • Recommended alerts:
  • Service unavailable for >1 minute
  • No data published for >5 minutes
  • High error rate (>5% of requests)

Integration Points

External Dependencies

  • Chainlink Direct Stream API - Provides index price data via WebSocket
  • Block Scholes API - Provides implied volatility data and interest rates
  • Redis - Used for publishing and caching market data

Internal Dependencies

  • Orderbook Service - Consumes market data for pricing and risk calculations
  • Sequencer Service - Uses index prices for settlement calculations

Troubleshooting

Common Issues

Issue Possible Cause Solution
Connection timeout to Chainlink API API key expired or network issue 1. Verify API key is valid and not expired
2. Check network connectivity
3. Confirm the service URL is correct in .env
"Redis connection refused" error Redis server not running or wrong configuration 1. Ensure Redis is running (redis-cli ping)
2. Check REDIS_HOST and REDIS_PORT in .env
3. Verify network allows connection to Redis port
Missing data for specific expiry New expiry not added to subscription list 1. Check logs for subscription errors
2. Verify expiry exists in provider
3. Restart service to refresh subscription list

Debugging Tips

  • Enable debug logging by setting ENV=development
  • Monitor Redis channels directly with redis-cli subscribe chainlink:index:BTC
  • Check WebSocket connection status at /health endpoint
  • Review logs for connection and processing errors

Contributing

For guidelines on contributing to this project, please see the CONTRIBUTING.md file in the repository root.

License

Proprietary - All rights reserved

⚠️ Important: The API keys shown above are placeholders. Never commit actual API keys to the repository.