Skip to main content

Prefect Flows and Operations

Prefect is an open-source task orchestration framework that is used to manage the Minds ETL pipeline. The Prefect framework is set up and managed through a docker-compose.yml container configuration, currently using version 0.14.16 of Prefect. The documentation for this version can be found here

Setup and Run

The tasks in the Minds ETL pipeline make use of the Prefect "secrets" mechanism for configuration and credentials information. These take the form of environment variables which are, in turn, injected using the docker-compose environment file facility.

The Minds BitWarden vault contains a copy of the .env file in a note called "ETL environment", which needs to be copied into the same folder as the docker-compose.yml file before running docker-compose

The steps to install and start the Prefect ETL environment on a new server are as follows:

$ git clone [email protected]:minds/data-science/minds-etl.git
$ cp .env minds-etl/docker
$ cd minds-etl/docker
$ docker-compose up -d

This will start the necessary containers for running Prefect (and Airbyte, which is used for Stripe data extraction). The Prefect management UI can be accessed on local port 8080 (and will need access to ports 4200 and 4201). The Airbyte UI can be accessed on local port 8000. Use ssh port forwarding to access these ports from your local system.

The Prefect UI is well-organized, and mostly self-explanatory, but you can find documentation here if needed.

Updates and Redeployment

One of the most important Prefect flows in the ETL pipeline is the run-dbt flow, which runs the Minds dbt project. This flow will automatically pull the latest version of the dbt project from Gitlab each time it runs, so if changes are made only within the minds-etl/dbt folder, there is no need to take any action beyond committing the changes to the minds-etl repository master branch.

If, however, changes are made to any of the flows in the minds-etl/prefect folder, it will be necessary to restart the Prefect containers for the changes to take effect, following this procedure:

$ cd minds-etl/docker
$ docker-compose down
$ git pull
$ docker-compose up -d

Maintenance Notes

New flows

If a new Prefect flow is added to the ETL pipeline, there are a few checklist items to consider:

  • Make sure the name given to the flow in the with Flow("new-flow-to-do-something") as flow: flow registration line is accurate and descriptive, and not accidentally copy/pasted from another flow (which will cause bad things to happen)
  • If the flow requires configuration or credentials secrets that are not already provided:
    • Add an appropriately-named Prefect secret in the flow (use the examples from other flows for guidance)
    • Add the environment variable name (i.e., the Prefect secret name with PREFECT__CONTEXT__SECRETS__ prepended) to the environment directive for the agent container in the minds-etl/docker/docker-compose.yml file
    • Add the new environment variable to the .env file and the BitWarden vault
  • Be sure to add the flow with the appropriate dependencies in the overall orchestration directed acyclic graph ("DAG") in minds-etl/prefect/all-etl-flow.py, if it is intended to run as part of the nightly ETL pipeline run
  • Be sure to give the python file for the flow a name that ends with -flow.py so it is picked up as a flow file to register with Prefect on startup

Cassandra Entity Extraction

The flow extract-cassandra will perform a complete extraction of all entities in the minds.entities table. This will take approximately a day and a half to complete, so the all-etl flow schedule should be disabled in the Prefect management interface while it is running.

The flow extract-cassandra-incremental reads entity ids from the entities-ops pulsar topic, and only extracts entities from the minds.entities table that have been added or changed since the previous run. It is possible for this process to get out of sync in the event of abnormal ops events, in which case it is necessary to run extract-cassandra to get a full extraction and thereby reestablish the incremental baseline.

In the event additional fields for any entities need to be added to the extraction process, those fields can be added to the minds-etl/prefect/cassandra_entities_metadata.py file.

In this case, is necessary to update the corresponding tables in Snowflake to add the corresponding columns, so that the schemas match BEFORE running either extract-cassandra or extract-cassandra-ingremental.

This update/synchronization of the corresponding Snowflake schema can be achieved using this pattern of SQL commands:

create table cassandra_extract.table_new as
select
first_column,
second_column,
null::typespec as new_column,
extracted_at
from
cassandra_extract.table
order by
extracted_at -- always this first
-- , time_created -- and this, or equivalent (e.g. collector_tstamp), if applicable
-- , guid -- or, if no record timestamp field, then order by entity key
;
alter table cassandra_extract.table rename to cassandra_extract.table_old;
alter table cassandra_extract.table_new rename to cassandra_extract.table;

Cassandra Table Extraction

The flow extract-cassandra-tables extracts tables from Cassandra other than minds.entities. It does this by reading the sqlalchemy schema specification in minds-etl/prefect/cassandra_tables_metadata.py. To add a new table to the extraction, simply add it to the schema specification, following the rules on lines 4 and 5 of the file. To add or remove fields from an existing table extraction, update the corresponding table specification in the metadata file, and update the corresponding Snowflake table schema as in the previous section.

Because the tables in Cassandra do not, for the most part, have a time_updated or equivalent field, the extract-cassandra-tables process must extract all records from all tables every night. This can cause the corresponding tables in Snowflake to grow very large, and slow and expensive to process. Periodically, it will therefore become necessary to "compact" these tables by removing redundant records. This can be done via a method similar to the schema restructuring in the section above. For example:

create table cassandra_extract.notification_compressed as
select
key,
from_guid,
to_guid,
object_guid,
access_id,
notification_view,
time_created,
time_updated,
enabled,
min(extracted_at) as extracted_at
from
cassandra_extract.notification
group by
key,
from_guid,
to_guid,
object_guid,
access_id,
notification_view,
time_created,
time_updated,
enabled
order by
extracted_at
;
grant ownership on table cassandra_extract.notification_compressed to role etl_role;
alter table cassandra_extract.notification rename to cassandra_extract.notification_archive;
alter table cassandra_extract.notification_compressed rename to cassandra_extract.notification;