Tiger Cloud: Performance, Scale, Enterprise, Free

Self-hosted products

MST

Apache Airflow® is a platform created by the community to programmatically author, schedule, and monitor workflows.

A DAG (Directed Acyclic Graph) is the core concept of Airflow, collecting Tasks together, organized with dependencies and relationships to say how they should run. You declare a DAG in a Python file in the $AIRFLOW_HOME/dags folder of your Airflow instance.

This page shows you how to use a Python connector in a DAG to integrate Apache Airflow with a Tiger Cloud service.

To follow the steps on this page:

This example DAG uses the company table you create in Optimize time-series data in hypertables

To install the Python libraries required to connect to Tiger Cloud:

  1. Enable Postgres connections between Airflow and Tiger Cloud

    pip install psycopg2-binary
  2. Enable Postgres connection types in the Airflow UI

    pip install apache-airflow-providers-postgres

In your Airflow instance, securely connect to your Tiger Cloud service:

  1. Run Airflow

    On your development machine, run the following command:

    airflow standalone

    The username and password for Airflow UI are displayed in the standalone | Login with username line in the output.

  2. Add a connection from Airflow to your Tiger Cloud service

    1. In your browser, navigate to localhost:8080, then select Admin > Connections.
    2. Click + (Add a new record), then use your connection info to fill in the form. The Connection Type is Postgres.

To exchange data between Airflow and your Tiger Cloud service:

  1. Create and execute a DAG

    To insert data in your Tiger Cloud service from Airflow:

    1. In $AIRFLOW_HOME/dags/timescale_dag.py, add the following code:

      from airflow import DAG
      from airflow.operators.python_operator import PythonOperator
      from airflow.hooks.postgres_hook import PostgresHook
      from datetime import datetime
      def insert_data_to_timescale():
      hook = PostgresHook(postgres_conn_id='the ID of the connenction you created')
      conn = hook.get_conn()
      cursor = conn.cursor()
      """
      This could be any query. This example inserts data into the table
      you create in:
      https://www.tigerdata.com/docs/getting-started/latest/try-key-features-timescale-products/#optimize-time-series-data-in-hypertables-with-hypercore
      """
      cursor.execute("INSERT INTO crypto_assets (symbol, name) VALUES (%s, %s)",
      ('NEW/Asset','New Asset Name'))
      conn.commit()
      cursor.close()
      conn.close()
      default_args = {
      'owner': 'airflow',
      'start_date': datetime(2023, 1, 1),
      'retries': 1,
      }
      dag = DAG('timescale_dag', default_args=default_args, schedule_interval='@daily')
      insert_task = PythonOperator(
      task_id='insert_data',
      python_callable=insert_data_to_timescale,
      dag=dag,
      )

      This DAG uses the company table created in Create regular Postgres tables for relational data.

    2. In your browser, refresh the Airflow UI.

    3. In Search DAGS, type timescale_dag and press ENTER.

    4. Press the play icon and trigger the DAG:

      daily eth volume of assets

  2. Verify that the data appears in Tiger Cloud

    1. In Tiger Console, navigate to your service and click SQL Editor at the bottom.

    2. Run a query to view your data. For example: SELECT symbol, name FROM company;.

      You see the new rows inserted in the table.

You have successfully integrated Apache Airflow with Tiger Cloud and created a data pipeline.

Keywords

Found an issue on this page?Report an issue or Edit this page in GitHub.