- Published on
dlt - A beginners guide
Implementing modern data engineering pipelines using dlt and python generators
dlt
is an open-source library that you can add to your Python scripts to load data from various and often messy data sources into well-structured, live datasets.
The code goes through the homework of Workshop 1 of the dezoomcamp
using dlt to build faster and efficient pipelines.
Setup
To begin our workshop, we install required packages i.e dlt and duckdb.
#Install the dependencies
%%capture
!pip install dlt[duckdb]
import dlt
To build better pipelines, we make use of Generators
in python. Generators in Python are functions that allow you to generate a sequence of values over time, rather than computing and returning all the values at once. They are a powerful and memory-efficient way to work with sequences of data, especially when dealing with large datasets or infinite sequences.
def square_root_generator(limit):
n = 1
while n <= limit:
yield n ** 0.5
n += 1
# Example usage:
limit = 13
generator = square_root_generator(limit)
sum = 0
for sqrt_value in generator:
print(sqrt_value)
- Generators use
lazy evaluation
, meaning they generate values only when requested. This allows for efficient memory usage, especially when dealing with large datasets or infinite sequences.
For the first question we create a count variable and store the sum of values for every iteration, whereas for the second bit we just update the iteration to 13 to get the 13th value on yield
.
Now for the other questions, we need to load the data from the generators into duckdb
.
DuckDB is an open-source in-memory analytical database management system (DBMS) designed for analytical workloads. It is optimized for fast query performance, low memory usage, and efficient processing of analytical queries, making it well-suited for interactive data analysis tasks.
# Create a pipeline
pipeline = dlt.pipeline(
pipeline_name="quick_start", destination="duckdb", dataset_name="mydata"
)
users = pipeline.run(people_1, table_name="users", write_disposition='replace')
users = pipeline.run(people_2, table_name="users", write_disposition='append')
users = pipeline.run(people_2, table_name="users", write_disposition='merge', primary_key='id')
where people_1
is the generator and users
is the table name. We can also specify the write_disposition
to be replace, append, and merge depending on what we need.
Let's create a connection,
import duckdb
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")
# let's see the tables
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")
print('Loaded tables: ')
display(conn.sql("show tables"))
To view the results, we can use the dlt
library to query the data from the duckdb
database.
user = conn.sql("SELECT * FROM users").df()
display(user)
The above query will display the results of the users
table, and to get the sum of the ages, we can use the following query.
print(user['age'].sum())