~/snippets/dlt-code-block
Published on

dlt - A beginners guide

595 words3 min read

Implementing modern data engineering pipelines using dlt and python generators

dlt is an open-source library that you can add to your Python scripts to load data from various and often messy data sources into well-structured, live datasets.

The code goes through the homework of Workshop 1 of the dezoomcamp using dlt to build faster and efficient pipelines.

Setup

To begin our workshop, we install required packages i.e dlt and duckdb.

#Install the dependencies
%%capture
!pip install dlt[duckdb]
import dlt

To build better pipelines, we make use of Generators in python. Generators in Python are functions that allow you to generate a sequence of values over time, rather than computing and returning all the values at once. They are a powerful and memory-efficient way to work with sequences of data, especially when dealing with large datasets or infinite sequences.

def square_root_generator(limit):
    n = 1
    while n <= limit:
        yield n ** 0.5
        n += 1

# Example usage:
limit = 13
generator = square_root_generator(limit)
sum = 0

for sqrt_value in generator:
    print(sqrt_value)
  • Generators use lazy evaluation, meaning they generate values only when requested. This allows for efficient memory usage, especially when dealing with large datasets or infinite sequences.

For the first question we create a count variable and store the sum of values for every iteration, whereas for the second bit we just update the iteration to 13 to get the 13th value on yield.

Now for the other questions, we need to load the data from the generators into duckdb.

DuckDB is an open-source in-memory analytical database management system (DBMS) designed for analytical workloads. It is optimized for fast query performance, low memory usage, and efficient processing of analytical queries, making it well-suited for interactive data analysis tasks.

# Create a pipeline
pipeline = dlt.pipeline(
    pipeline_name="quick_start", destination="duckdb", dataset_name="mydata"
)
users = pipeline.run(people_1, table_name="users", write_disposition='replace')
users = pipeline.run(people_2, table_name="users", write_disposition='append')
users = pipeline.run(people_2, table_name="users", write_disposition='merge', primary_key='id')

where people_1 is the generator and users is the table name. We can also specify the write_disposition to be replace, append, and merge depending on what we need.

Let's create a connection,

import duckdb
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")
# let's see the tables
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")
print('Loaded tables: ')
display(conn.sql("show tables"))

To view the results, we can use the dlt library to query the data from the duckdb database.

user = conn.sql("SELECT * FROM users").df()
display(user)

The above query will display the results of the users table, and to get the sum of the ages, we can use the following query.

print(user['age'].sum())