Found 2,798 repositories(showing 30)
Edwardvaneechoud
Flowfile is a visual ETL tool and Python library combining drag-and-drop workflows with Polars dataframes. Build data pipelines visually, define flows programmatically with a Polars-like API, and export to standalone Python code. Perfect for fast, intuitive data processing from development to production.
PacktPublishing
Building ETL Pipelines with Python
dimgold
ETL with Python - Taught at DWH course 2017 (TAU)
bhowiebkr
Professional workflow automation platform with universal Python ecosystem access. Visual node-based scripting editor enabling ETL pipelines, API integrations, data transformations, and business process automation through embedded Python code. Built with PySide6 for cross-platform desktop deployment.
pabeli
This is a simple ETL project with Python :)
cckuqui
Merge between databases of MyAnimeList and Crunchyroll with ETL process with python and postgresql
CyberNexusX
A scalable ETL pipeline solution for seamless migration of large datasets from on-premises to Azure cloud storage. Built with Azure Data Factory, Python, SQL, and Terraform, this project optimizes both performance and cost-efficiency for enterprise data migration projects.
fdouetteau
Minimalistic ETL in Python, with yield and generators
ultranet1
Project Description: A music streaming company wants to introduce more automation and monitoring to their data warehouse ETL pipelines and they have come to the conclusion that the best tool to achieve this is Apache Airflow. As their Data Engineer, I was tasked to create a reusable production-grade data pipeline that incorporates data quality checks and allows for easy backfills. Several analysts and Data Scientists rely on the output generated by this pipeline and it is expected that the pipeline runs daily on a schedule by pulling new data from the source and store the results to the destination. Data Description: The source data resides in S3 and needs to be processed in a data warehouse in Amazon Redshift. The source datasets consist of JSON logs that tell about user activity in the application and JSON metadata about the songs the users listen to. Data Pipeline design: At a high-level the pipeline does the following tasks. Extract data from multiple S3 locations. Load the data into Redshift cluster. Transform the data into a star schema. Perform data validation and data quality checks. Calculate the most played songs for the specified time interval. Load the result back into S3. dag Structure of the Airflow DAG Design Goals: Based on the requirements of our data consumers, our pipeline is required to adhere to the following guidelines: The DAG should not have any dependencies on past runs. On failure, the task is retried for 3 times. Retries happen every 5 minutes. Catchup is turned off. Do not email on retry. Pipeline Implementation: Apache Airflow is a Python framework for programmatically creating workflows in DAGs, e.g. ETL processes, generating reports, and retraining models on a daily basis. The Airflow UI automatically parses our DAG and creates a natural representation for the movement and transformation of data. A DAG simply is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. A DAG describes how you want to carry out your workflow, and Operators determine what actually gets done. By default, airflow comes with some simple built-in operators like PythonOperator, BashOperator, DummyOperator etc., however, airflow lets you extend the features of a BaseOperator and create custom operators. For this project, I developed several custom operators. operators The description of each of these operators follows: StageToRedshiftOperator: Stages data to a specific redshift cluster from a specified S3 location. Operator uses templated fields to handle partitioned S3 locations. LoadFactOperator: Loads data to the given fact table by running the provided sql statement. Supports delete-insert and append style loads. LoadDimensionOperator: Loads data to the given dimension table by running the provided sql statement. Supports delete-insert and append style loads. SubDagOperator: Two or more operators can be grouped into one task using the SubDagOperator. Here, I am grouping the tasks of checking if the given table has rows and then run a series of data quality sql commands. HasRowsOperator: Data quality check to ensure that the specified table has rows. DataQualityOperator: Performs data quality checks by running sql statements to validate the data. SongPopularityOperator: Calculates the top ten most popular songs for a given interval. The interval is dictated by the DAG schedule. UnloadToS3Operator: Stores the analysis result back to the given S3 location. Code for each of these operators is located in the plugins/operators directory. Pipeline Schedule and Data Partitioning: The events data residing on S3 is partitioned by year (2018) and month (11). Our task is to incrementally load the event json files, and run it through the entire pipeline to calculate song popularity and store the result back into S3. In this manner, we can obtain the top songs per day in an automated fashion using the pipeline. Please note, this is a trivial analyis, but you can imagine other complex queries that follow similar structure. S3 Input events data: s3://<bucket>/log_data/2018/11/ 2018-11-01-events.json 2018-11-02-events.json 2018-11-03-events.json .. 2018-11-28-events.json 2018-11-29-events.json 2018-11-30-events.json S3 Output song popularity data: s3://skuchkula-topsongs/ songpopularity_2018-11-01 songpopularity_2018-11-02 songpopularity_2018-11-03 ... songpopularity_2018-11-28 songpopularity_2018-11-29 songpopularity_2018-11-30 The DAG can be configured by giving it some default_args which specify the start_date, end_date and other design choices which I have mentioned above. default_args = { 'owner': 'shravan', 'start_date': datetime(2018, 11, 1), 'end_date': datetime(2018, 11, 30), 'depends_on_past': False, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=5), 'catchup_by_default': False, 'provide_context': True, } How to run this project? Step 1: Create AWS Redshift Cluster using either the console or through the notebook provided in create-redshift-cluster Run the notebook to create AWS Redshift Cluster. Make a note of: DWN_ENDPOINT :: dwhcluster.c4m4dhrmsdov.us-west-2.redshift.amazonaws.com DWH_ROLE_ARN :: arn:aws:iam::506140549518:role/dwhRole Step 2: Start Apache Airflow Run docker-compose up from the directory containing docker-compose.yml. Ensure that you have mapped the volume to point to the location where you have your DAGs. NOTE: You can find details of how to manage Apache Airflow on mac here: https://gist.github.com/shravan-kuchkula/a3f357ff34cf5e3b862f3132fb599cf3 start_airflow Step 3: Configure Apache Airflow Hooks On the left is the S3 connection. The Login and password are the IAM user's access key and secret key that you created. Basically, by using these credentials, we are able to read data from S3. On the right is the redshift connection. These values can be easily gathered from your Redshift cluster connections Step 4: Execute the create-tables-dag This dag will create the staging, fact and dimension tables. The reason we need to trigger this manually is because, we want to keep this out of main dag. Normally, creation of tables can be handled by just triggering a script. But for the sake of illustration, I created a DAG for this and had Airflow trigger the DAG. You can turn off the DAG once it is completed. After running this DAG, you should see all the tables created in the AWS Redshift. Step 5: Turn on the load_and_transform_data_in_redshift dag As the execution start date is 2018-11-1 with a schedule interval @daily and the execution end date is 2018-11-30, Airflow will automatically trigger and schedule the dag runs once per day for 30 times. Shown below are the 30 DAG runs ranging from start_date till end_date, that are trigged by airflow once per day. schedule
GuandataOSS
A lightweight ELT & ETL tool, based on Duckdb and Apache Parquet, seamless integration with Python & Java plugins
rvilla87
ETL (Extract, Transform and Load) with the Spark Python API (PySpark) and Hadoop Distributed File System (HDFS)
petaly-labs
Python Open-source ETL tool for seamless data movement across PostgreSQL, MySQL, Redshift, BigQuery, S3, GCS, and CSV files, with yaml/json-based configuration.
apicrafter
NoSQL extract, transform, load (ETL) toolkit with Python
KnowledgeSeed
An Advanced Python ETL Framework for IBM Planning Analytics. A high-level library built on the foundations of TM1py and Bedrock. It streamlines complex ETL workflows with pre-configured logic while maintaining the full extensibility of Python.
A data analytics project using Python, Excel, and machine learning to forecast retail demand and optimize inventory levels. Includes scalable ETL pipelines, advanced forecasting models, and interactive dashboards, with weekly updates to showcase progress and commitment.
Foroozani
:bangbang: Handle Big Data for Machine Learning using Python and PySpark, Building ETL Pipelines with PySpark, MongoDB, and Bokeh
Dina-Hosny
An ETL Data Pipelines Project that uses AirFlow DAGs to extract employees' data from PostgreSQL Schemas, load it in AWS Data Lake, Transform it with Python script, and Finally load it into SnowFlake Data warehouse using SCD type 2.
betofleitass
ETL process with Python and Pentaho
abbas99-hub
In this article, we explored a comprehensive supply chain analytics project, encompassing data extraction, ETL using Python, loading data into Snowflake, and creating an interactive dashboard with Power BI. By harnessing the power of ETL, Snowflake, and Power BI, businesses can unlock valuable insights from their supply chain data.
kennycontreras
ETL pipeline with Python and PostgreSQL
MFuglsang
Using QGIS as an ETL tool with Python from CMD
S-MTZG
High-performance ETL pipeline processing 500k+ rows in <300ms. Built with Python & Rust-based Polars. Optimized for automated data cleaning. ⚡
aditya22041
Information Integration Architecture (IIA CSE656) course project at IIIT-Delhi: end-to-end ETL pipelines, global-schema mapping, federated SQL querying, and AI-driven analytics for restaurant & vendor data. Built with Python, React, and LLM-powered natural-language interfaces.
ronaldkanyepi
A scalable architecture for real-time log processing and visualization. Built with a Kafka-Spark ETL pipeline, DynamoDB for storing aggregate real-time metrics, and Python Dash for interactive dashboards. Designed for high-throughput log ingestion, real-time monitoring, and long-term storage.
Data modeling with Postgres and build an ETL pipeline using Python.
mk-hasan
ETL pipeline with Python Object Oriented Approach using Pandas, Python and Google Sheet API.
mehroosali
Built functional python ETL script with functions that initialized spark clusters using pyspark library to extract songs stored in S3 bucket. Partitioned songs data by year and artist_id and compressed in parquet output files to increase load performance. Used the overwrite mode in spark to ensure every new run of ELT script is overwritten in the data lake to avoid duplicates. Orchestrated ELT data pipeline that extracts from S3, loads in redshift for transformation and loads output back to S3. Used hooks in airflow to make connection credentials configurable in order to separate access rights from code base for security. Used operators to execute loading and transformation scripts for redshift with airflow DAG.
aiwithqasim
No description available
Muhammad-Muzammil-Shah
No description available
AhmedSamiii
Developed a solo ETL pipeline for a bicycle store, adeptly handling extraction, quality checks, and transformations. Leveraged Python for seamless integration of diverse data sources and utilized visualization libraries for insightful analytics.