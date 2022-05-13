Tiger Cloud: Performance, Scale, Enterprise, Free Self-hosted products MST

This tutorial uses a dataset that contains second-by-second stock-trade data for the top 100 most-traded symbols, in a hypertable named stocks_real_time . It also includes a separate table of company symbols and company names, in a regular Postgres table named company .

To follow the steps on this page:

Create a target Tiger Cloud service with the Real-time analytics capability. You need your connection details. This procedure also works for self-hosted TimescaleDB.

When you connect to the Twelve Data API through a websocket, you create a persistent connection between your computer and the websocket server. You set up a Python environment, and pass two arguments to create a websocket object and establish the connection.

Create a new Python virtual environment for this project and activate it. All the packages you need to complete for this tutorial are installed in this environment.

Create and activate a Python virtual environment: virtualenv env source env/bin/activate Copy Install the Twelve Data Python wrapper library with websocket support. This library allows you to make requests to the API and maintain a stable websocket connection. pip install twelvedata websocket-client Copy Install Psycopg2 so that you can connect the TimescaleDB from your Python script: pip install psycopg2-binary Copy

A persistent connection between your computer and the websocket server is used to receive data for as long as the connection is maintained. You need to pass two arguments to create a websocket object and establish connection.

on_event This argument needs to be a function that is invoked whenever there's a new data record is received from the websocket: def on_event ( event ) : print ( event ) Copy This is where you want to implement the ingestion logic so whenever there's new data available you insert it into the database.

symbols This argument needs to be a list of stock ticker symbols (for example, MSFT ) or crypto trading pairs (for example, BTC/USD ). When using a websocket connection you always need to subscribe to the events you want to receive. You can do this by using the symbols argument or if your connection is already created you can also use the subscribe() function to get data for additional symbols.

Connect to the websocket server Create a new Python file called websocket_test.py and connect to the Twelve Data servers using the <YOUR_API_KEY> : import time from twelvedata import TDClient messages_history = [ ] def on_event ( event ) : print ( event ) messages_history . append ( event ) td = TDClient ( apikey = "<YOUR_API_KEY>" ) ws = td . websocket ( symbols = [ "BTC/USD" , "ETH/USD" ] , on_event = on_event ) ws . subscribe ( [ 'ETH/BTC' , 'AAPL' ] ) ws . connect ( ) while True : print ( 'messages received: ' , len ( messages_history ) ) ws . heartbeat ( ) time . sleep ( 10 ) Copy Run the Python script: python websocket_test.py Copy When you run the script, you receive a response from the server about the status of your connection: {'event': 'subscribe-status', 'status': 'ok', 'success': [ {'symbol': 'BTC/USD', 'exchange': 'Coinbase Pro', 'mic_code': 'Coinbase Pro', 'country': '', 'type': 'Digital Currency'}, {'symbol': 'ETH/USD', 'exchange': 'Huobi', 'mic_code': 'Huobi', 'country': '', 'type': 'Digital Currency'} ], 'fails': None } Copy When you have established a connection to the websocket server, wait a few seconds, and you can see data records, like this: {'event': 'price', 'symbol': 'BTC/USD', 'currency_base': 'Bitcoin', 'currency_quote': 'US Dollar', 'exchange': 'Coinbase Pro', 'type': 'Digital Currency', 'timestamp': 1652438893, 'price': 30361.2, 'bid': 30361.2, 'ask': 30361.2, 'day_volume': 49153} {'event': 'price', 'symbol': 'BTC/USD', 'currency_base': 'Bitcoin', 'currency_quote': 'US Dollar', 'exchange': 'Coinbase Pro', 'type': 'Digital Currency', 'timestamp': 1652438896, 'price': 30380.6, 'bid': 30380.6, 'ask': 30380.6, 'day_volume': 49157} {'event': 'heartbeat', 'status': 'ok'} {'event': 'price', 'symbol': 'ETH/USD', 'currency_base': 'Ethereum', 'currency_quote': 'US Dollar', 'exchange': 'Huobi', 'type': 'Digital Currency', 'timestamp': 1652438899, 'price': 2089.07, 'bid': 2089.02, 'ask': 2089.03, 'day_volume': 193818} {'event': 'price', 'symbol': 'BTC/USD', 'currency_base': 'Bitcoin', 'currency_quote': 'US Dollar', 'exchange': 'Coinbase Pro', 'type': 'Digital Currency', 'timestamp': 1652438900, 'price': 30346.0, 'bid': 30346.0, 'ask': 30346.0, 'day_volume': 49167} Copy Each price event gives you multiple data points about the given trading pair such as the name of the exchange, and the current price. You can also occasionally see heartbeat events in the response; these events signal the health of the connection over time. At this point the websocket connection is working successfully to pass data.

Hypertables are Postgres tables in TimescaleDB that automatically partition your time-series data by time. Time-series data represents the way a system, process, or behavior changes over time. Hypertables enable TimescaleDB to work efficiently with time-series data. Each hypertable is made up of child tables called chunks. Each chunk is assigned a range of time, and only contains data from that range. When you run a query, TimescaleDB identifies the correct chunk and runs the query on it, instead of going through the entire table.

Hypercore is the hybrid row-columnar storage engine in TimescaleDB used by hypertables. Traditional databases force a trade-off between fast inserts (row-based storage) and efficient analytics (columnar storage). Hypercore eliminates this trade-off, allowing real-time analytics without sacrificing transactional capabilities.

Hypercore dynamically stores data in the most efficient format for its lifecycle:

Expand image

Row-based storage for recent data : the most recent chunk (and possibly more) is always stored in the rowstore, ensuring fast inserts, updates, and low-latency single record queries. Additionally, row-based storage is used as a writethrough for inserts and updates to columnar storage.

: the most recent chunk (and possibly more) is always stored in the rowstore, ensuring fast inserts, updates, and low-latency single record queries. Additionally, row-based storage is used as a writethrough for inserts and updates to columnar storage. Columnar storage for analytical performance: chunks are automatically compressed into the columnstore, optimizing storage efficiency and accelerating analytical queries.

Unlike traditional columnar databases, hypercore allows data to be inserted or modified at any stage, making it a flexible solution for both high-ingest transactional workloads and real-time analytics—within a single database.

Because TimescaleDB is 100% Postgres, you can use all the standard Postgres tables, indexes, stored procedures, and other objects alongside your hypertables. This makes creating and working with hypertables similar to standard Postgres.

When you have relational data that enhances your time-series data, store that data in standard Postgres relational tables.

Add a table to store the asset symbol and name in a relational table CREATE TABLE crypto_assets ( symbol TEXT UNIQUE , "name" TEXT ) ; Copy

You now have two tables within your Tiger Cloud service. A hypertable named crypto_ticks , and a normal Postgres table named crypto_assets .

When you ingest data into a transactional database like Timescale, it is more efficient to insert data in batches rather than inserting data row-by-row. Using one transaction to insert multiple rows can significantly increase the overall ingest capacity and speed of your Tiger Cloud service.

A common practice to implement batching is to store new records in memory first, then after the batch reaches a certain size, insert all the records from memory into the database in one transaction. The perfect batch size isn't universal, but you can experiment with different batch sizes (for example, 100, 1000, 10000, and so on) and see which one fits your use case better. Using batching is a fairly common pattern when ingesting data into TimescaleDB from Kafka, Kinesis, or websocket connections.

To ingest the data into your Tiger Cloud service, you need to implement the on_event function.

After the websocket connection is set up, you can use the on_event function to ingest data into the database. This is a data pipeline that ingests real-time financial data into your Tiger Cloud service.

You can implement a batching solution in Python with Psycopg2. You can implement the ingestion logic within the on_event function that you can then pass over to the websocket object.

This function needs to:

Check if the item is a data item, and not websocket metadata. Adjust the data so that it fits the database schema, including the data types, and order of columns. Add it to the in-memory batch, which is a list in Python. If the batch reaches a certain size, insert the data, and reset or empty the list.

Update the Python script that prints out the current batch size, so you can follow when data gets ingested from memory into your database. Use the <HOST> , <PASSWORD> , and <PORT> details for the Tiger Cloud service where you want to ingest the data and your API key from Twelve Data: import time import psycopg2 from twelvedata import TDClient from psycopg2 . extras import execute_values from datetime import datetime class WebsocketPipeline ( ) : DB_TABLE = "stocks_real_time" DB_COLUMNS = [ "time" , "symbol" , "price" , "day_volume" ] MAX_BATCH_SIZE = 100 def __init__ ( self , conn ) : """Connect to the Twelve Data web socket server and stream data into the database. Args: conn: psycopg2 connection object """ self . conn = conn self . current_batch = [ ] self . insert_counter = 0 def _insert_values ( self , data ) : if self . conn is not None : cursor = self . conn . cursor ( ) sql = f""" INSERT INTO { self . DB_TABLE } ( { ',' . join ( self . DB_COLUMNS ) } ) VALUES %s;""" execute_values ( cursor , sql , data ) self . conn . commit ( ) def _on_event ( self , event ) : """This function gets called whenever there's a new data record coming back from the server. Args: event (dict): data record """ if event [ "event" ] == "price" : timestamp = datetime . utcfromtimestamp ( event [ "timestamp" ] ) data = ( timestamp , event [ "symbol" ] , event [ "price" ] , event . get ( "day_volume" ) ) self . current_batch . append ( data ) print ( f"Current batch size: { len ( self . current_batch ) } " ) if len ( self . current_batch ) == self . MAX_BATCH_SIZE : self . _insert_values ( self . current_batch ) self . insert_counter += 1 print ( f"Batch insert # { self . insert_counter } " ) self . current_batch = [ ] def start ( self , symbols ) : """Connect to the web socket server and start streaming real-time data into the database. Args: symbols (list of symbols): List of stock/crypto symbols """ td = TDClient ( apikey = "<YOUR_API_KEY" ) ws = td . websocket ( on_event = self . _on_event ) ws . subscribe ( symbols ) ws . connect ( ) while True : ws . heartbeat ( ) time . sleep ( 10 ) onn = psycopg2 . connect ( database = "tsdb" , host = "<HOST>" , user = "tsdbadmin" , password = "<PASSWORD>" , port = "<PORT>" ) symbols = [ "BTC/USD" , "ETH/USD" , "MSFT" , "AAPL" ] websocket = WebsocketPipeline ( conn ) websocket . start ( symbols = symbols ) ``` Copy Run the script: python websocket_test.py Copy

You can even create separate Python scripts to start multiple websocket connections for different types of symbols, for example, one for stock, and another one for cryptocurrency prices.

If you see an error message similar to this:

2022-05-13 18:51:41,976 - ws-twelvedata - ERROR - TDWebSocket ERROR: Handshake status 200 OK Copy

Then check that you use a proper API key received from Twelve Data.

To visualize the results of your queries, enable Grafana to read the data in your service: