Datastream Service
Overview
Datastream is a real-time market data ingestion service that connects to external price feeds and provides standardized market data to the Kyan Exchange ecosystem. It handles connections to Chainlink for spot price data and Block Scholes for options volatility metrics, making this data available through Redis.
π‘ Beginner Tip: Think of Datastream as the "market data feed" for our exchange. It continuously fetches current prices from trusted sources and makes them available to our trading services.
Features
- Multi-source data ingestion - Connects to multiple external providers for redundancy and completeness
- Real-time price updates - Processes market updates within milliseconds of receipt
- Automatic reconnection - Maintains stable connections with intelligent backoff and retry logic
- Data normalization - Transforms provider-specific formats into a standardized internal format
- Health monitoring - Provides comprehensive health checks and metrics for observability
Technical Specifications
Core Technology Stack
- Runtime: Node.js >= 22.0.0
- Primary Frameworks: Express 4.21.2, WebSocket 8.13.0
- Build System: NX with ESBuild
- Service Type: Streaming Service
- Version: 0.1.0
Dependencies
- Core:
- ws - WebSocket client and server implementation
- redis - Redis client for data publishing
- express - HTTP server for health endpoints and metrics
- Testing:
- Jest - Testing framework
- Redis Mock - Redis mocking for integration tests
- Supertest - HTTP assertion library
- Internal Libraries:
- commons-ts - Shared utilities and types
- connections-ts - Database and service connections
Getting Started
Prerequisites
- Node.js 22.x
- Yarn 4.x
- Redis server 7.x
- API keys for Chainlink and Block Scholes
Environment Setup
Required environment variables:
ENV=development # Environment (development, staging, production)
CHAINLINK_API_KEY=your_api_key # API key for Chainlink data access
CHAINLINK_CLIENT_ID=your_client_id # Client ID for Chainlink authentication
BLOCK_SCHOLES_API_KEY=your_api_key # (use your own key) # API key for Block Scholes data
REDIS_HOST=localhost # Redis server hostname
REDIS_PORT=6379 # Redis server port
Installation
# Install dependencies from root monorepo
yarn install
# Build the project
yarn nx build datastream
# OR
nx build datastream
Development
Testing
# Run unit tests
nx test datastream
# Run integration tests
nx test datastream --config=jest.integration.config.ts
# View test coverage
nx test datastream --coverage
Architecture
Component Overview
Datastream consists of several specialized components that handle different data providers and markets. Each provider has a dedicated client that manages connection, authentication, and data processing.
βββββββββββββββββββ βββββββββββββββββββ
β β β β
β Chainlink β β Block Scholes β
β WebSocket Clientβ β WebSocket Clientβ
β β β β
ββββββββββ¬βββββββββ ββββββββββ¬βββββββββ
β β
βΌ βΌ
βββββββββββββββββββ βββββββββββββββββββ
β β β β
β Index Price β β Options Market β
β Processor β β Data Processor β
β β β β
ββββββββββ¬βββββββββ ββββββββββ¬βββββββββ
β β
βΌ βΌ
βββββββββββββββββββββββββββββββββββββββββββ
β β
β Redis Publisher β
β β
βββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββ
β β
β Express API β
β (Health/Metrics)β
β β
βββββββββββββββββββ
Core Components
-
WebSocket Clients (src/chainlink/indexPrice.ts, src/blockScholes/optionsMarketData.ts)
-
Establish and maintain WebSocket connections to data providers
- Handle authentication, reconnection, and error recovery
-
Process incoming market data messages
-
Data Processors (src/chainlink/indexPrice.ts, src/blockScholes/optionsMarketData.ts)
-
Transform raw market data into standardized format
- Apply business rules and validation
-
Handle special cases like expiration dates
-
Redis Publisher (within the processors)
-
Publishes processed data to Redis channels
- Sets cached values for latest prices
-
Maintains format consistency
-
Express API (src/app.ts)
- Provides health check endpoints
- Exposes metrics for monitoring
- Manages service lifecycle
Project Structure
datastream/
βββ src/ # Source code
β βββ blockScholes/ # Block Scholes client implementation
β β βββ config.ts # Configuration
β β βββ optionsMarketData.ts # Client implementation
β β βββ types.ts # Type definitions
β βββ chainlink/ # Chainlink client implementation
β β βββ config.ts # Configuration
β β βββ indexPrice.ts # Client implementation
β β βββ types.ts # Type definitions
β βββ utils/ # Utility functions
β β βββ logger.ts # Logging utility
β β βββ maturities.ts # Date handling
β β βββ ping.ts # Health check utility
β βββ app.ts # Main application entry
β βββ index.ts # Service entry point
βββ tests/ # Tests
β βββ unit/ # Unit tests
β βββ it/ # Integration tests
βββ jest.config.ts # Jest configuration
βββ project.json # NX project configuration
API Reference
Health Endpoints
Endpoint | Method | Description | Response |
---|---|---|---|
/health |
GET | Service health check | { status: "ok", uptime: 123456 } |
/expiration |
GET | Current expiration dates being tracked | { expirations: ["2025-06-27", "2025-07-25"] } |
/metrics |
GET | Service metrics | Various metrics in Prometheus format |
Redis Channels
Datastream publishes data to the following Redis channels:
Channel | Data Format | Description |
---|---|---|
chainlink:index:{base} |
{ price: number, timestamp: number } |
Latest index price for a base currency |
blockscholes:iv:{base}:{expiry} |
{ iv: number, timestamp: number } |
Implied volatility data for a specific expiry |
Infrastructure
Deployment
- Containerized deployment via Docker (Dockerfile provided)
- Requires access to Redis instance
- Recommended resources: 1 CPU, 1GB RAM minimum
- Deployed as part of the core infrastructure stack
Monitoring and Health
- Health check endpoint:
/health
- Prometheus metrics endpoint:
/metrics
- Structured JSON logging to stdout
- GCP logging integration for production environments
- Recommended alerts:
- Service unavailable for >1 minute
- No data published for >5 minutes
- High error rate (>5% of requests)
Integration Points
External Dependencies
- Chainlink Direct Stream API - Provides index price data via WebSocket
- Block Scholes API - Provides implied volatility data and interest rates
- Redis - Used for publishing and caching market data
Internal Dependencies
- Orderbook Service - Consumes market data for pricing and risk calculations
- Sequencer Service - Uses index prices for settlement calculations
Troubleshooting
Common Issues
Issue | Possible Cause | Solution |
---|---|---|
Connection timeout to Chainlink API | API key expired or network issue | 1. Verify API key is valid and not expired 2. Check network connectivity 3. Confirm the service URL is correct in .env |
"Redis connection refused" error | Redis server not running or wrong configuration | 1. Ensure Redis is running (redis-cli ping )2. Check REDIS_HOST and REDIS_PORT in .env 3. Verify network allows connection to Redis port |
Missing data for specific expiry | New expiry not added to subscription list | 1. Check logs for subscription errors 2. Verify expiry exists in provider 3. Restart service to refresh subscription list |
Debugging Tips
- Enable debug logging by setting
ENV=development
- Monitor Redis channels directly with
redis-cli subscribe chainlink:index:BTC
- Check WebSocket connection status at
/health
endpoint - Review logs for connection and processing errors
Contributing
For guidelines on contributing to this project, please see the CONTRIBUTING.md file in the repository root.
Related Projects
- Orderbook - Trading engine that consumes market data
- Instruments - Option instrument definitions
- Sequencer - Settlement and risk management
License
Proprietary - All rights reserved
β οΈ Important: The API keys shown above are placeholders. Never commit actual API keys to the repository.