Normal view

Introduction to Vector Databases using ChromaDB

25 November 2025 at 22:35

In the previous embeddings tutorial series, we built a semantic search system that could find relevant research papers based on meaning rather than keywords. We generated embeddings for 500 arXiv papers, implemented similarity calculations using cosine similarity, and created a search function that returned ranked results.

But here's the problem with that approach: our search worked by comparing the query embedding against every single paper in the dataset. For 500 papers, this brute-force approach was manageable. But what happens when we scale to 5,000 papers? Or 50,000? Or 500,000?

Why Brute-Force Won’t Work

Brute-force similarity search scales linearly. If we have 5,000 papers, checking all of them takes a noticeable amount of time. Scale to 50,000 papers and queries become painfully slower. At 500,000 papers, each search would become unusable. That's the reality of brute-force similarity search: query time grows directly with dataset size. This approach simply doesn't scale to production systems.

Vector databases solve this problem. They use specialized data structures called approximate nearest neighbor (ANN) indexes that can find similar vectors in milliseconds, even with millions of documents. Instead of checking every single embedding, they use clever algorithms to quickly narrow down to the most promising candidates.

This tutorial teaches you how to use ChromaDB, a local vector database perfect for learning and prototyping. We'll load 5,000 arXiv papers with their embeddings, build our first vector database collection, and discover exactly when and why vector databases provide real performance advantages over brute-force NumPy calculations.

What You'll Learn

By the end of this tutorial, you'll be able to:

  • Set up ChromaDB and create your first collection
  • Insert embeddings efficiently using batch patterns
  • Run vector similarity queries that return ranked results
  • Understand HNSW indexing and how it trades accuracy for speed
  • Filter results using metadata (categories, years, authors)
  • Compare performance between NumPy and ChromaDB at different scales
  • Make informed decisions about when to use a vector database

Most importantly, you'll understand the break-even point. We're not going to tell you "vector databases always win." We're going to show you exactly where they provide value and where simpler approaches work just fine.

Understanding the Dataset

For this tutorial series, we'll work with 5,000 research papers from arXiv spanning five computer science categories:

  • cs.LG (Machine Learning): 1,000 papers about neural networks, training algorithms, and ML theory
  • cs.CV (Computer Vision): 1,000 papers about image processing, object detection, and visual recognition
  • cs.CL (Computational Linguistics): 1,000 papers about NLP, language models, and text processing
  • cs.DB (Databases): 1,000 papers about data storage, query optimization, and database systems
  • cs.SE (Software Engineering): 1,000 papers about development practices, testing, and software architecture

These papers come with pre-generated embeddings from Cohere's API using the same approach from the embeddings series. Each paper is represented as a 1536-dimensional vector that captures its semantic meaning. The balanced distribution across categories will help us see how well vector search and metadata filtering work across different topics.

Setting Up Your Environment

First, create a virtual environment (recommended best practice):

python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

Using a virtual environment keeps your project dependencies isolated and prevents conflicts with other Python projects.

Now install the required packages. This tutorial was developed with Python 3.12.12 and the following versions:

# Developed with: Python 3.12.12
# chromadb==1.3.4
# numpy==2.0.2
# pandas==2.2.2
# scikit-learn==1.6.1
# matplotlib==3.10.0
# cohere==5.20.0
# python-dotenv==1.1.1

pip install chromadb numpy pandas scikit-learn matplotlib cohere python-dotenv

ChromaDB is lightweight and runs entirely on your local machine. No servers to configure, no cloud accounts to set up. This makes it perfect for learning and prototyping before moving to production databases.

You'll also need your Cohere API key from the embeddings series. Make sure you have a .env file in your working directory with:

COHERE_API_KEY=your_key_here

Downloading the Dataset

The dataset consists of two files you'll download and place in your working directory:

arxiv_papers_5k.csv download (7.7 MB)
Contains paper metadata: titles, abstracts, authors, publication dates, and categories

embeddings_cohere_5k.npy download (61.4 MB)
Contains 1536-dimensional embedding vectors for all 5,000 papers

Download both files and place them in the same directory as your Python script or notebook.

Let's verify the files loaded correctly:

import numpy as np
import pandas as pd

# Load the metadata
df = pd.read_csv('arxiv_papers_5k.csv')
print(f"Loaded {len(df)} papers")

# Load the embeddings
embeddings = np.load('embeddings_cohere_5k.npy')
print(f"Loaded embeddings with shape: {embeddings.shape}")
print(f"Each paper is represented by a {embeddings.shape[1]}-dimensional vector")

# Verify they match
assert len(df) == len(embeddings), "Mismatch between papers and embeddings!"

# Check the distribution across categories
print(f"\nPapers per category:")
print(df['category'].value_counts().sort_index())

# Look at a sample paper
print(f"\nSample paper:")
print(f"Title: {df['title'].iloc[0]}")
print(f"Category: {df['category'].iloc[0]}")
print(f"Abstract: {df['abstract'].iloc[0][:200]}...")
Loaded 5000 papers
Loaded embeddings with shape: (5000, 1536)
Each paper is represented by a 1536-dimensional vector

Papers per category:
category
cs.CL    1000
cs.CV    1000
cs.DB    1000
cs.LG    1000
cs.SE    1000
Name: count, dtype: int64

Sample paper:
Title: Optimizing Mixture of Block Attention
Category: cs.LG
Abstract: Mixture of Block Attention (MoBA) (Lu et al., 2025) is a promising building block for efficiently processing long contexts in LLMs by enabling queries to sparsely attend to a small subset of key-value...

We now have 5,000 papers with embeddings, perfectly balanced across five categories. Each embedding is 1536 dimensions, and papers and embeddings match exactly.

Your First ChromaDB Collection

A collection in ChromaDB is like a table in a traditional database. It stores embeddings along with associated metadata and provides methods for querying. Let's create our first collection:

import chromadb

# Initialize ChromaDB in-memory client (data only exists while script runs)
client = chromadb.Client()

# Create a collection
collection = client.create_collection(
    name="arxiv_papers",
    metadata={"description": "5000 arXiv papers from computer science"}
)

print(f"Created collection: {collection.name}")
print(f"Collection count: {collection.count()}")
Created collection: arxiv_papers
Collection count: 0

The collection starts empty. Now let's add our embeddings. But here's something critical you need to know: Production systems always batch operations, and for good reasons: memory efficiency, error handling, progress tracking, and the ability to process datasets larger than RAM. ChromaDB reinforces this best practice by enforcing a version-dependent maximum batch size per add() call (approximately 5,461 embeddings in ChromaDB 1.3.4).

Rather than viewing this as a limitation, think of it as ChromaDB nudging you toward production-ready patterns from day one. Let's implement proper batching:

# Prepare the data for ChromaDB
# ChromaDB wants: IDs, embeddings, metadata, and optional documents
ids = [f"paper_{i}" for i in range(len(df))]
metadatas = [
    {
        "title": row['title'],
        "category": row['category'],
        "year": int(str(row['published'])[:4]),  # Store year as integer for filtering
        "authors": row['authors'][:100] if len(row['authors']) <= 100 else row['authors'][:97] + "..."
    }
    for _, row in df.iterrows()
]
documents = df['abstract'].tolist()

# Insert in batches to respect the ~5,461 embedding limit
batch_size = 5000  # Safe batch size well under the limit
print(f"Inserting {len(embeddings)} embeddings in batches of {batch_size}...")

for i in range(0, len(embeddings), batch_size):
    batch_end = min(i + batch_size, len(embeddings))
    print(f"  Batch {i//batch_size + 1}: Adding papers {i} to {batch_end}")

    collection.add(
        ids=ids[i:batch_end],
        embeddings=embeddings[i:batch_end].tolist(),
        metadatas=metadatas[i:batch_end],
        documents=documents[i:batch_end]
    )

print(f"\nCollection now contains {collection.count()} papers")
Inserting 5000 embeddings in batches of 5000...
  Batch 1: Adding papers 0 to 5000

Collection now contains 5000 papers

Since our dataset has exactly 5,000 papers, we can add them all in one batch. But this batching pattern is essential knowledge because:

  1. If we had 8,000 or 10,000 papers, we'd need multiple batches
  2. Production systems always batch operations for efficiency
  3. It's good practice to think in batches from the start

The metadata we're storing (title, category, year, authors) will enable filtered searches later. ChromaDB stores this alongside each embedding, making it instantly available when we query.

Your First Vector Similarity Query

Now comes the exciting part: searching our collection using semantic similarity. But first, we need to address something critical: queries need to use the same embedding model as the documents.

If you mix models—say, querying Cohere embeddings with OpenAI embeddings—you'll either get dimension mismatch errors or, if the dimensions happen to align, results that are... let's call them "creatively unpredictable." The rankings won't reflect actual semantic similarity, making your search effectively random.

Our collection contains Cohere embeddings (1536 dimensions), so we'll use Cohere for queries too. Let's set it up:

from cohere import ClientV2
from dotenv import load_dotenv
import os

# Load your Cohere API key
load_dotenv()
cohere_api_key = os.getenv('COHERE_API_KEY')

if not cohere_api_key:
    raise ValueError(
        "COHERE_API_KEY not found. Make sure you have a .env file with your API key."
    )

co = ClientV2(api_key=cohere_api_key)
print("✓ Cohere API key loaded")

Now let's query for papers about neural network training:

# First, embed the query using Cohere (same model as our documents)
query_text = "neural network training and optimization techniques"

response = co.embed(
    texts=[query_text],
    model='embed-v4.0',
    input_type='search_query',
    embedding_types=['float']
)
query_embedding = np.array(response.embeddings.float_[0])

print(f"Query: '{query_text}'")
print(f"Query embedding shape: {query_embedding.shape}")

# Now search the collection
results = collection.query(
    query_embeddings=[query_embedding.tolist()],
    n_results=5
)

# Display the results
print(f"\nTop 5 most similar papers:")
print("=" * 80)

for i in range(len(results['ids'][0])):
    paper_id = results['ids'][0][i]
    distance = results['distances'][0][i]
    metadata = results['metadatas'][0][i]

    print(f"\n{i+1}. {metadata['title']}")
    print(f"   Category: {metadata['category']} | Year: {metadata['year']}")
    print(f"   Distance: {distance:.4f}")
    print(f"   Abstract: {results['documents'][0][i][:150]}...")
Query: 'neural network training and optimization techniques'
Query embedding shape: (1536,)

Top 5 most similar papers:
================================================================================

1. Training Neural Networks at Any Scale
   Category: cs.LG | Year: 2025
   Distance: 1.1162
   Abstract: This article reviews modern optimization methods for training neural networks with an emphasis on efficiency and scale. We present state-of-the-art op...

2. On the Convergence of Overparameterized Problems: Inherent Properties of the Compositional Structure of Neural Networks
   Category: cs.LG | Year: 2025
   Distance: 1.2571
   Abstract: This paper investigates how the compositional structure of neural networks shapes their optimization landscape and training dynamics. We analyze the g...

3. A Distributed Training Architecture For Combinatorial Optimization
   Category: cs.LG | Year: 2025
   Distance: 1.3027
   Abstract: In recent years, graph neural networks (GNNs) have been widely applied in tackling combinatorial optimization problems. However, existing methods stil...

4. Adam symmetry theorem: characterization of the convergence of the stochastic Adam optimizer
   Category: cs.LG | Year: 2025
   Distance: 1.3254
   Abstract: Beside the standard stochastic gradient descent (SGD) method, the Adam optimizer due to Kingma & Ba (2014) is currently probably the best-known optimi...

5. Distribution-Aware Tensor Decomposition for Compression of Convolutional Neural Networks
   Category: cs.CV | Year: 2025
   Distance: 1.3430
   Abstract: Neural networks are widely used for image-related tasks but typically demand considerable computing power. Once a network has been trained, however, i...

Let's talk about what we're seeing here. The results show exactly what we want:

The top 4 papers are all cs.LG (Machine Learning) and directly discuss neural network training, optimization, convergence, and the Adam optimizer. The 5th result is from Computer Vision but discusses neural network compression - still topically relevant.

The distances range from 1.12 to 1.34, which corresponds to cosine similarities of about 0.44 to 0.33. While these aren't the 0.8+ scores you might see in highly specialized single-domain datasets, they represent solid semantic matches for a multi-domain collection.

This is the reality of production vector search: Modern research papers share significant vocabulary overlap across fields. ML terminology appears in computer vision, NLP, databases, and software engineering papers. What we get is a ranking system that consistently surfaces relevant papers at the top, even if absolute similarity scores are moderate.

Why did we manually embed the query? Because our collection contains Cohere embeddings (1536 dimensions), queries must also use Cohere embeddings. If we tried using ChromaDB's default embedding model (all-MiniLM-L6-v2, which produces 384-dimensional vectors), we'd get a dimension mismatch error. Query embeddings and document embeddings must come from the same model. This is a fundamental rule in vector search.

About those distance values: ChromaDB uses squared L2 distance by default. For normalized embeddings (like Cohere's), there's a mathematical relationship: distance ≈ 2(1 - cosine_similarity). So a distance of 1.16 corresponds to a cosine similarity of about 0.42. That might seem low compared to theoretical maximums, but it's typical for real-world multi-domain datasets where vocabulary overlaps significantly.

Understanding What Just Happened

Let's break down what occurred behind the scenes:

1. Query Embedding
We explicitly embedded our query text using Cohere's API (the same model that generated our document embeddings). This is crucial because ChromaDB doesn't know or care what embedding model you used. It just stores vectors and calculates distances. If query embeddings don't match document embeddings (same model, same dimensions), search results will be garbage.

2. HNSW Index
ChromaDB uses an algorithm called HNSW (Hierarchical Navigable Small World) to organize embeddings. Think of HNSW as building a multi-level map of the vector space. Instead of checking all 5,000 papers, it uses this map to quickly navigate to the most promising regions.

3. Approximate Search
HNSW is an approximate nearest neighbor algorithm. It doesn't guarantee finding the absolute closest papers, but it finds very close papers extremely quickly. For most applications, this trade-off between perfect accuracy and blazing speed is worth it.

4. Distance Calculation
ChromaDB returns distances between the query and each result. By default, it uses squared Euclidean distance (L2), where lower values mean higher similarity. This is different from the cosine similarity we used in the embeddings series, but both metrics work well for comparing embeddings.

We'll explore HNSW in more depth later, but for now, the key insight is: ChromaDB doesn't check every single paper. It uses a smart index to jump directly to relevant regions of the vector space.

Why We're Storing Metadata

You might have noticed we're storing title, category, year, and authors as metadata alongside each embedding. While we won't use this metadata in this tutorial, we're setting it up now for future tutorials where we'll explore powerful combinations: filtering by metadata (category, year, author) and hybrid search approaches that combine semantic similarity with keyword matching.

For now, just know that ChromaDB stores this metadata efficiently alongside embeddings, and it becomes available in query results without any performance penalty.

The Performance Question: When Does ChromaDB Actually Help?

Now let's address the big question: when is ChromaDB actually faster than just using NumPy? Let's run a head-to-head comparison at our 5,000-paper scale.

First, let's implement the NumPy brute-force approach (what we built in the embeddings series):

from sklearn.metrics.pairwise import cosine_similarity
import time

def numpy_search(query_embedding, embeddings, top_k=5):
    """Brute-force similarity search using NumPy"""
    # Calculate cosine similarity between query and all papers
    similarities = cosine_similarity(
        query_embedding.reshape(1, -1),
        embeddings
    )[0]

    # Get top k indices
    top_indices = np.argsort(similarities)[::-1][:top_k]

    return top_indices

# Generate a query embedding (using one of our paper embeddings as a proxy)
query_embedding = embeddings[0]

# Test NumPy approach
start_time = time.time()
for _ in range(100):  # Run 100 queries to get stable timing
    top_indices = numpy_search(query_embedding, embeddings, top_k=5)
numpy_time = (time.time() - start_time) / 100 * 1000  # Convert to milliseconds

print(f"NumPy brute-force search (5000 papers): {numpy_time:.2f} ms per query")
NumPy brute-force search (5000 papers): 110.71 ms per query

Now let's compare with ChromaDB:

# Test ChromaDB approach (query using the embedding directly)
start_time = time.time()
for _ in range(100):
    results = collection.query(
        query_embeddings=[query_embedding.tolist()],
        n_results=5
    )
chromadb_time = (time.time() - start_time) / 100 * 1000

print(f"ChromaDB search (5000 papers): {chromadb_time:.2f} ms per query")
print(f"\nSpeedup: {numpy_time / chromadb_time:.1f}x faster")
ChromaDB search (5000 papers): 2.99 ms per query

Speedup: 37.0x faster

ChromaDB is 37x faster at 5,000 papers. That's the difference between a query taking 111ms versus 3ms. Let's visualize how this scales:

import matplotlib.pyplot as plt

# Scaling data based on actual 5k benchmark
# NumPy scales linearly (110.71ms / 5000 = 0.022142 ms per paper)
# ChromaDB stays flat due to HNSW indexing
dataset_sizes = [500, 1000, 2000, 5000, 8000, 10000]
numpy_times = [11.1, 22.1, 44.3, 110.7, 177.1, 221.4]  # ms (extrapolated from 5k benchmark)
chromadb_times = [3.0, 3.0, 3.0, 3.0, 3.0, 3.0]  # ms (stays constant)

plt.figure(figsize=(10, 6))
plt.plot(dataset_sizes, numpy_times, 'o-', linewidth=2, markersize=8,
         label='NumPy (Brute Force)', color='#E63946')
plt.plot(dataset_sizes, chromadb_times, 's-', linewidth=2, markersize=8,
         label='ChromaDB (HNSW)', color='#2A9D8F')

plt.xlabel('Number of Papers', fontsize=12)
plt.ylabel('Query Time (milliseconds)', fontsize=12)
plt.title('Vector Search Performance: NumPy vs ChromaDB',
          fontsize=14, fontweight='bold', pad=20)
plt.legend(loc='upper left', fontsize=11)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

# Calculate speedup at different scales
print("\nSpeedup at different dataset sizes:")
for size, numpy, chroma in zip(dataset_sizes, numpy_times, chromadb_times):
    speedup = numpy / chroma
    print(f"  {size:5d} papers: {speedup:5.1f}x faster")

Vector Search Performance - Numpy vs ChromaDB

Speedup at different dataset sizes:
    500 papers:   3.7x faster
   1000 papers:   7.4x faster
   2000 papers:  14.8x faster
   5000 papers:  36.9x faster
   8000 papers:  59.0x faster
  10000 papers:  73.8x faster

Note: These benchmarks were measured on a standard development machine with Python 3.12.12. Your actual query times will vary based on hardware, but the relative performance characteristics (flat scaling for ChromaDB vs linear for NumPy) will remain consistent.

This chart tells a clear story:

NumPy's time grows linearly with dataset size. Double the papers, double the query time. That's because brute-force search checks every single embedding.

ChromaDB's time stays flat regardless of dataset size. Whether we have 500 papers or 10,000 papers, queries take about 3ms in our benchmarks. These timings are illustrative (extrapolated from our 5k test on a standard development machine) and will vary based on your hardware and index configuration—but the core insight holds: ChromaDB query time stays relatively flat as your dataset grows, unlike NumPy's linear scaling.

The break-even point is around 1,000-2,000 papers. Below that, the overhead of maintaining an index might not be worth it. Above that, ChromaDB provides clear advantages that grow with scale.

Understanding HNSW: The Magic Behind Fast Queries

We've seen that ChromaDB is dramatically faster than brute-force search, but how does HNSW make this possible? Let's build intuition without diving into complex math.

The Basic Idea: Navigable Small Worlds

Imagine you're in a massive library looking for books similar to one you're holding. A brute-force approach would be to check every single book on every shelf. HNSW is like having a smart navigation system:

Layer 0 (Ground Level): Contains all embeddings, densely connected to nearby neighbors

Layer 1: Contains a subset of embeddings with longer-range connections

Layer 2: Even fewer embeddings with even longer connections

Layer 3: The top layer with just a few embeddings spanning the entire space

When we query, HNSW starts at the top layer (with very few points) and quickly narrows down to promising regions. Then it drops to the next layer and refines. By the time it reaches the ground layer, it's already in the right neighborhood and only needs to check a small fraction of the total embeddings.

The Trade-off: Accuracy vs Speed

HNSW is an approximate algorithm. It doesn't guarantee finding the absolute closest papers, but it finds very close papers very quickly. This trade-off is controlled by parameters:

  • ef_construction: How carefully the index is built (higher = better quality, slower build)
  • ef_search: How thoroughly queries search (higher = better recall, slower queries)
  • M: Number of connections per point (higher = better search, more memory)

ChromaDB uses sensible defaults that work well for most applications. Let's verify the quality of approximate search:

# Compare ChromaDB results to exact NumPy results
query_embedding = embeddings[100]

# Get top 10 from NumPy (exact)
numpy_results = numpy_search(query_embedding, embeddings, top_k=10)

# Get top 10 from ChromaDB (approximate)
chromadb_results = collection.query(
    query_embeddings=[query_embedding.tolist()],
    n_results=10
)

# Extract paper indices from ChromaDB results (convert "paper_123" to 123)
chromadb_indices = [int(id.split('_')[1]) for id in chromadb_results['ids'][0]]

# Calculate overlap
overlap = len(set(numpy_results) & set(chromadb_indices))

print(f"NumPy top 10 (exact): {numpy_results}")
print(f"ChromaDB top 10 (approximate): {chromadb_indices}")
print(f"\nOverlap: {overlap}/10 papers match")
print(f"Recall@10: {overlap/10*100:.1f}%")
NumPy top 10 (exact): [ 100  984  509 2261 3044  701 1055  830 3410 1311]
ChromaDB top 10 (approximate): [100, 984, 509, 2261, 3044, 701, 1055, 830, 3410, 1311]

Overlap: 10/10 papers match
Recall@10: 100.0%

With default settings, ChromaDB achieves 100% recall on this query, meaning it found exactly the same top 10 papers as the exact brute-force search. This high accuracy is typical for the dataset sizes we're working with. The approximate nature of HNSW becomes more noticeable at massive scales (millions of vectors), but even then, the quality is excellent for most applications.

Memory Usage and Resource Requirements

ChromaDB keeps its HNSW index in memory for fast access. Let's measure how much RAM our 5,000-paper collection uses:

# Estimate memory usage
embedding_memory = embeddings.nbytes / (1024 ** 2)  # Convert to MB

print(f"Memory usage estimates:")
print(f"  Raw embeddings: {embedding_memory:.1f} MB")
print(f"  HNSW index overhead: ~{embedding_memory * 0.5:.1f} MB (estimated)")
print(f"  Total (approximate): ~{embedding_memory * 1.5:.1f} MB")
Memory usage estimates:
  Raw embeddings: 58.6 MB
  HNSW index overhead: ~29.3 MB (estimated)
  Total (approximate): ~87.9 MB

For 5,000 papers with 1536-dimensional embeddings, we're looking at roughly 90-100MB of RAM. This scales linearly: 10,000 papers would be about 180-200MB, 50,000 papers about 900MB-1GB.

This is completely manageable for modern computers. Even a basic laptop can easily handle collections with tens of thousands of documents. The memory requirements only become a concern at massive scales (hundreds of thousands or millions of vectors), which is when you'd move to production vector databases designed for distributed deployment.

Important ChromaDB Behaviors to Know

Before we move on, let's cover some important behaviors that will save you debugging time:

1. In-Memory vs Persistent Storage

Our code uses chromadb.Client(), which creates an in-memory client. The collection only exists while the Python script runs. When the script ends, the data disappears.

For persistent storage, use:

# Persistent storage (data saved to disk)
client = chromadb.PersistentClient(path="./chroma_db")

This saves the collection to a local directory. Next time you run the script, the data will still be there.

2. Collection Deletion and Index Growth

ChromaDB's HNSW index grows but never shrinks. If we add 5,000 documents then delete 4,000, the index still uses memory for 5,000. The only way to reclaim this space is to create a new collection and re-add the documents we want to keep.

This is a known limitation with HNSW indexes. It's not a bug, it's a fundamental trade-off for the algorithm's speed. Keep this in mind when designing systems that frequently add and remove documents.

3. Batch Size Limits

Remember the ~5,461 embedding limit per add() call? This isn't ChromaDB being difficult; it's protecting you from overwhelming the system. Always batch your insertions in production systems.

4. Default Embedding Function

When you call collection.query(query_texts=["some text"]), ChromaDB automatically embeds your query using its default model (all-MiniLM-L6-v2). This is convenient but might not match the embeddings you added to the collection.

For production systems, you typically want to:

  • Use the same embedding model for queries and documents
  • Either embed queries yourself and use query_embeddings, or configure ChromaDB's embedding function to match your model

Comparing Results: Query Understanding

Let's run a few different queries to see how well vector search understands intent:

queries = [
    "machine learning model evaluation metrics",
    "how do convolutional neural networks work",
    "SQL query optimization techniques",
    "testing and debugging software systems"
]

for query in queries:
    # Embed the query
    response = co.embed(
        texts=[query],
        model='embed-v4.0',
        input_type='search_query',
        embedding_types=['float']
    )
    query_embedding = np.array(response.embeddings.float_[0])

    # Search
    results = collection.query(
        query_embeddings=[query_embedding.tolist()],
        n_results=3
    )

    print(f"\nQuery: '{query}'")
    print("-" * 80)

    categories = [meta['category'] for meta in results['metadatas'][0]]
    titles = [meta['title'] for meta in results['metadatas'][0]]

    for i, (cat, title) in enumerate(zip(categories, titles)):
        print(f"{i+1}. [{cat}] {title[:60]}...")
Query: 'machine learning model evaluation metrics'
--------------------------------------------------------------------------------
1. [cs.CL] Factual and Musical Evaluation Metrics for Music Language Mo...
2. [cs.DB] GeoSQL-Eval: First Evaluation of LLMs on PostGIS-Based NL2Ge...
3. [cs.SE] GeoSQL-Eval: First Evaluation of LLMs on PostGIS-Based NL2Ge...

Query: 'how do convolutional neural networks work'
--------------------------------------------------------------------------------
1. [cs.LG] Covariance Scattering Transforms...
2. [cs.CV] Elements of Active Continuous Learning and Uncertainty Self-...
3. [cs.CV] Convolutional Fully-Connected Capsule Network (CFC-CapsNet):...

Query: 'SQL query optimization techniques'
--------------------------------------------------------------------------------
1. [cs.DB] LLM4Hint: Leveraging Large Language Models for Hint Recommen...
2. [cs.DB] Including Bloom Filters in Bottom-up Optimization...
3. [cs.DB] Query Optimization in the Wild: Realities and Trends...

Query: 'testing and debugging software systems'
--------------------------------------------------------------------------------
1. [cs.SE] Enhancing Software Testing Education: Understanding Where St...
2. [cs.SE] Design and Implementation of Data Acquisition and Analysis S...
3. [cs.SE] Identifying Video Game Debugging Bottlenecks: An Industry Pe...

Notice how the search correctly identifies the topic for each query:

  • ML evaluation → Machine Learning and evaluation-related papers
  • CNNs → Computer Vision papers with one ML paper
  • SQL optimization → Database papers
  • Testing → Software Engineering papers

The system understands semantic meaning. Even when queries use natural language phrasing like "how do X work," it finds topically relevant papers. The rankings are what matter - relevant papers consistently appear at the top, even if absolute similarity scores are moderate.

When ChromaDB Is Enough vs When You Need More

We now have a working vector database running on our laptop. But when is ChromaDB sufficient, and when do you need a production database like Pinecone, Qdrant, or Weaviate?

ChromaDB is perfect for:

  • Learning and prototyping: Get immediate feedback without infrastructure setup
  • Local development: No internet required, no API costs
  • Small to medium datasets: Up to 100,000 documents on a standard laptop
  • Single-machine applications: Desktop tools, local RAG systems, personal assistants
  • Rapid experimentation: Test different embedding models or chunking strategies

Move to production databases when you need:

  • Massive scale: Millions of vectors or high query volume (thousands of QPS)
  • Distributed deployment: Multiple machines, load balancing, high availability
  • Advanced features: Hybrid search, multi-tenancy, access control, backup/restore
  • Production SLAs: Guaranteed uptime, support, monitoring
  • Team collaboration: Multiple developers working with shared data

We'll explore production databases in a later tutorial. For now, ChromaDB gives us everything we need to learn the core concepts and build impressive projects.

Practical Exercise: Exploring Your Own Queries

Before we wrap up, try experimenting with different queries:

# Helper function to make querying easier
def search_papers(query_text, n_results=5):
    """Search papers using semantic similarity"""
    # Embed the query
    response = co.embed(
        texts=[query_text],
        model='embed-v4.0',
        input_type='search_query',
        embedding_types=['float']
    )
    query_embedding = np.array(response.embeddings.float_[0])

    # Search
    results = collection.query(
        query_embeddings=[query_embedding.tolist()],
        n_results=n_results
    )

    return results

# Your turn: try these queries and examine the results

# 1. Find papers about a specific topic
results = search_papers("reinforcement learning and robotics")

# 2. Try a different domain
results_cv = search_papers("image segmentation techniques")

# 3. Test with a broad query
results_broad = search_papers("deep learning applications")

# Examine the results for each query
# What patterns do you notice?
# Do the results make sense for each query?

Some things to explore:

  • Query phrasing: Does "neural networks" return different results than "deep learning" or "artificial neural networks"?
  • Specificity: How do very specific queries ("BERT model fine-tuning") compare to broad queries ("natural language processing")?
  • Cross-category topics: What happens when you search for topics that span multiple categories, like "machine learning for databases"?
  • Result quality: Look at the categories and distances - do the most similar papers make sense for each query?

This hands-on exploration will deepen your intuition about how vector search works and what to expect in real applications.

What You've Learned

We've built a complete vector database from scratch and understand the fundamentals:

Core Concepts:

  • Vector databases use ANN indexes (like HNSW) to search large collections efficiently
  • ChromaDB provides a simple, local database perfect for learning and prototyping
  • Collections store embeddings, metadata, and documents together
  • Batch insertion is required due to size limits (around 5,461 embeddings per call)

Performance Characteristics:

  • ChromaDB achieves 37x speedup over NumPy at 5,000 papers
  • Query time stays constant regardless of dataset size (around 3ms)
  • Break-even point is around 1,000-2,000 papers
  • Memory usage is manageable (about 90MB for 5,000 papers)

Practical Skills:

  • Loading pre-generated embeddings and metadata
  • Creating and querying ChromaDB collections
  • Running pure vector similarity searches
  • Comparing approximate vs exact search quality
  • Understanding when to use ChromaDB vs production databases

Critical Insights:

  • HNSW trades perfect accuracy for massive speed gains
  • Default settings achieve excellent recall for typical workloads
  • In-memory storage makes ChromaDB fast but limits persistence
  • Batching is not optional, it's a required pattern
  • Modern multi-domain datasets show moderate similarity scores due to vocabulary overlap
  • Query embeddings and document embeddings must use the same model

What's Next

We now have a vector database running locally with 5,000 papers. Next, we'll tackle a critical challenge: document chunking strategies.

Right now, we're searching entire paper abstracts as single units. But what if we want to search through full papers, documentation, or long articles? We need to break them into chunks, and how we chunk dramatically affects search quality.

The next tutorial will teach you:

  • Why chunking matters even with long-context LLMs in 2025
  • Different chunking strategies (sentence-based, token windows, structure-aware)
  • How to evaluate chunking quality using Recall@k
  • The trade-offs between chunk size, overlap, and search performance
  • Practical implementations you can use in production

Before moving on, make sure you understand these core concepts:

  • How vector similarity search works
  • What HNSW indexing does and why it's fast
  • When ChromaDB provides real advantages over brute-force search
  • How query and document embeddings must match

When you're comfortable with vector search basics, you’re ready to see how to handle real documents that are too long to embed as single units.


Key Takeaways:

  • Vector databases use approximate nearest neighbor algorithms (like HNSW) to search large collections in constant time
  • ChromaDB provides 37x speedup over NumPy brute-force at 5,000 papers, with query times staying flat as datasets grow
  • Batch insertion is mandatory due to embedding limit per add() call
  • HNSW creates a hierarchical navigation structure that checks only a fraction of embeddings while maintaining high accuracy
  • Default HNSW settings achieve excellent recall for typical datasets
  • Memory usage scales linearly (about 90MB for 5,000 papers with 1536-dimensional embeddings)
  • ChromaDB excels for learning, prototyping, and datasets up to ~100,000 documents on standard hardware
  • The break-even point for vector databases vs brute-force is around 1,000-2,000 documents
  • HNSW indexes grow but never shrink, requiring collection re-creation to reclaim space
  • In-memory storage provides speed but requires persistent client for data that survives script restarts
  • Modern multi-domain datasets show moderate similarity scores (0.3-0.5 cosine) due to vocabulary overlap across fields
  • Query embeddings and document embeddings must use the same model and dimensionality

Automating Amazon Book Data Pipelines with Apache Airflow and MySQL

25 November 2025 at 21:51

In our previous tutorial, we simulated market data pipelines to explore the full ETL lifecycle in Apache Airflow, from extracting and transforming data to loading it into a local database. Along the way, we integrated Git-based DAG management, automated validation through GitHub Actions, and synchronized our Airflow environment using git-sync, creating a workflow that closely mirrored real production setups.

Now, we’re taking things a step further by moving from simulated data to a real-world use case.

Imagine you’ve been given the task of finding the best engineering books on Amazon, extracting their titles, authors, prices, and ratings, and organizing all that information into a clean, structured table for analysis. Since Amazon’s listings change frequently, we need an automated workflow to fetch the latest data on a regular schedule. By orchestrating this extraction with Airflow, our pipeline can run every 24 hours, ensuring the dataset always reflects the most recent updates.

In this tutorial, you’ll take your Airflow skills beyond simulation and build a real-world ETL pipeline that extracts engineering book data from Amazon, transforms it with Python, and loads it into MySQL for structured analysis. You’ll orchestrate the process using Airflow’s TaskFlow API and custom operators for clean, modular design, while integrating GitHub Actions for version-controlled CI/CD deployment. To complete the setup, you’ll implement logging and monitoring so every stage, from extraction to loading, is tracked with full visibility and accuracy.

By the end, you’ll have a production-like pattern, an Airflow workflow that not only automates the extraction of Amazon book data but also demonstrates best practices in reliability, maintainability, and DevOps-driven orchestration.

Setting Up the Environment and Designing the ETL Pipeline

Setting Up the Environment

We have seen in our previous tutorial how running Airflow inside Docker provides a clean, portable, and reproducible setup for development. For this project, we’ll follow the same approach.

We’ve prepared a GitHub repository to help you get your environment up and running quickly. It includes the starter files you’ll need for this tutorial.

Begin by cloning the repository:

git clone [email protected]:dataquestio/tutorials.git

Then navigate to the Airflow tutorial directory:

cd airflow-docker-tutorial

Inside, you’ll find a structure similar to this:

airflow-docker-tutorial/
├── part-one/
├── part-two/
├── amazon-etl/
├── docker-compose.yaml
└── README.md

The part-one/ and part-two/ folders contain the complete reference files for our previous tutorials, while the amazon-etl/ folder is the workspace for this project, it contains all the DAGs, helper scripts, and configuration files we’ll build in this lesson. You don’t need to modify anything in the reference folders; they’re just there for review and comparison.

Your starting point is the docker-compose.yaml file, which defines the Airflow services. We’ve already seen how this file manages the Airflow api-server, scheduler, and supporting components.

Next, Airflow expects certain directories to exist before launching. Create them inside the same directory as your docker-compose.yaml file:

mkdir -p ./dags ./logs ./plugins ./config

Now, add a .env file in the same directory with the following line:

AIRFLOW_UID=50000

This ensures consistent file ownership between your host system and Docker containers.

For Linux users, you can generate this automatically with:

echo -e "AIRFLOW_UID=$(id -u)" > .env

Finally, initialize your Airflow metadata database:

docker compose up airflow-init

Make sure your Docker Desktop is already running before executing the command. Once initialization completes, bring up your Airflow environment:

docker compose up -d

If everything is set up correctly, you’ll see all Airflow containers running, including the webserver — which exposes the Airflow UI at http://localhost:8080. Open it in your browser and confirm that your environment is running smoothly by logging in using airflow as the username and airflow as password.

Designing the ETL Pipeline

Now that your environment is ready, it’s time to plan the structure of your ETL workflow before writing any code. Good data engineering practice begins with design, not implementation.

The first step is understanding the data flow, specifically, your source and destination.

In our case:

  • The source is Amazon’s public listings for data engineering books.
  • The destination is a MySQL database where we’ll store the extracted and transformed data for easy access and analysis.

The workflow will consist of three main stages:

  1. Extract – Scrape book information (titles, authors, prices, ratings) from Amazon pages.
  2. Transform – Clean and format the raw text into structured, numeric fields using Python and pandas.
  3. Load – Insert the processed data into a MySQL table for further use.

At a high level, our data pipeline will look like this:

Amazon Website (Source)
       ↓
   Extract Task
       ↓
   Transform Task
       ↓
   Load Task
       ↓
   MySQL Database (Destination)

To prepare data for loading into MySQL, we’ll need to convert scraped HTML into a tabular structure. The transformation step will include tasks like normalizing currency symbols, parsing ratings into numeric values, and ensuring all records are unique before loading.

When mapped into Airflow, these steps form a Directed Acyclic Graph (DAG) — a visual and logical representation of our workflow. Each box in the DAG represents a task (extract, transform, or load), and the arrows define their dependencies and execution order.

Here’s a conceptual view of the workflow:

[extract_amazon_books] → [transform_amazon_books] → [load_amazon_books]

Finally, we enhance our setup by adding git-sync for automatic DAG updates and GitHub Actions for CI validation, ensuring every change in GitHub reflects instantly in Airflow while your workflows are continuously checked for issues. By combining git-sync, CI checks, and Airflow’s built-in alerting (email or Slack), the entire Amazon ETL pipeline becomes stable, monitored, and much closer to a fully production-like patterns, orchestration system.

Building an ETL Pipeline with Airflow

Setting Up Your DAG File

Let’s start by creating the foundation of our workflow.

Before making changes, make sure to shut down any running containers to avoid conflicts:

docker compose down

Ensure that you disable the Example DAGs and switch to LocalExecutor, as we did in our previous tutorials

Now, open your airflow-docker-tutorial project folder and, inside the dags/ directory, create a new file named:

amazon_etl_dag.py

Every .py file inside this directory becomes a workflow that Airflow automatically detects and manages, no manual registration required. Airflow continuously scans the folder for new DAGs and dynamically loads them.

At the top of your file, import the core libraries needed for this project:

from airflow.decorators import dag, task
from datetime import datetime, timedelta
import pandas as pd
import random
import os
import time
import requests
from bs4 import BeautifulSoup

Let’s quickly review what each import does:

  • dag and task come from Airflow’s TaskFlow API, allowing us to define Python functions that become managed tasks, Airflow handles execution, dependencies, and retries automatically.
  • datetime and timedelta handle scheduling logic, such as when the DAG should start and how often it should run.
  • pandas, random, BeautifulSoup , requests , and os are standard Python libraries we’ll use to process and manage our data within the ETL steps.

This minimal setup is all you need to start orchestrating real-world data workflows.

Defining the DAG Structure

With the imports ready, let’s define the core of our workflow, the DAG configuration.

This determines when, how often, and under what conditions your pipeline runs.

default_args = {
    "owner": "Data Engineering Team",
    "retries": 3,
    "retry_delay": timedelta(minutes=2),
}

@dag(
    dag_id="amazon_books_etl_pipeline",
    description="Automated ETL pipeline to fetch and load Amazon Data Engineering book data into MySQL",
    schedule="@daily",
    start_date=datetime(2025, 11, 13),
    catchup=False,
    default_args=default_args,
    tags=["amazon", "etl", "airflow"],
)
def amazon_books_etl():
    ...

dag = amazon_books_etl()

Let’s break down what’s happening here:

  • default_args define reusable settings for all tasks, in this case, Airflow will automatically retry any failed task up to three times, with a two-minute delay between attempts. This is especially useful since our workflow depends on live web requests to Amazon, which can occasionally fail due to rate limits or connectivity issues.
  • The @dag decorator marks this function as an Airflow DAG. Everything inside amazon_books_etl() will form part of one cohesive workflow.
  • schedule="@daily" ensures our DAG runs once every 24 hours, keeping the dataset fresh.
  • start_date defines when Airflow starts scheduling runs.
  • catchup=False prevents Airflow from trying to backfill missed runs.
  • tags categorize the DAG in the Airflow UI for easier filtering.

Finally, the line:

dag = amazon_books_etl()

instantiates the workflow, making it visible and schedulable within Airflow.

Data Extraction with Airflow

With our DAG structure in place, the first step in our ETL pipeline is data extraction — pulling book data directly from Amazon’s live listings.

  • Note that this is for educational purposes: Amazon frequently updates its page structure and uses anti-bot protections, which can break scrapers without warning. In a real production project, we’d rely on official APIs or other stable data sources instead, since they provide consistent data across runs and keep long-term maintenance low.

When you search for something like “data engineering books” on Amazon, the search results page displays listings inside structured HTML containers such as:

<div data-component-type="s-impression-counter">

Each of these containers holds nested elements for the book title, author, price, and rating—information we can reliably parse using BeautifulSoup.

For example, when inspecting any of the listed books, we see a consistent HTML structure that guides how our scraper should behave:

Data Extraction with Airflow (Amazon Book Data Project)

Because Amazon paginates its results, our extraction logic systematically iterates through the first 10 pages, returning approximately 30 to 50 books. We intentionally limit the extraction to this number to keep the workload manageable while still capturing the most relevant items.

This approach ensures we gather the most visible and actively featured books—those trending or recently updated—rather than scraping random or deeply buried results. By looping through these pages, we create a dataset that is both fresh and representative, striking the right balance between completeness and efficiency.

Even though Amazon updates its listings frequently, our Airflow DAG runs every 24 hours, ensuring the extracted data always reflects the latest marketplace activity.

Here’s the Python logic behind the extraction step:

@task
def get_amazon_data_books(num_books=50, max_pages=10, ti=None):
    """
    Extracts Amazon Data Engineering book details such as Title, Author, Price, and Rating. Saves the raw extracted data locally and pushes it to XCom for downstream tasks.
    """
    headers = {
        "Referer": 'https://www.amazon.com/',
        "Sec-Ch-Ua": "Not_A Brand",
        "Sec-Ch-Ua-Mobile": "?0",
        "Sec-Ch-Ua-Platform": "macOS",
        'User-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36'
    }

    base_url = "https://www.amazon.com/s?k=data+engineering+books"
    books, seen_titles = [], set()
    page = 1  # start with page 1

    while page <= max_pages and len(books) < num_books:
        url = f"{base_url}&page={page}"

        try:
            response = requests.get(url, headers=headers, timeout=15)
        except requests.RequestException as e:
            print(f" Request failed: {e}")
            break

        if response.status_code != 200:
            print(f"Failed to retrieve page {page} (status {response.status_code})")
            break

        soup = BeautifulSoup(response.text, "html.parser")
        book_containers = soup.find_all("div", {"data-component-type": "s-impression-counter"})

        for book in book_containers:
            title_tag = book.select_one("h2 span")
            author_tag = book.select_one("a.a-size-base.a-link-normal")
            price_tag = book.select_one("span.a-price > span.a-offscreen")
            rating_tag = book.select_one("span.a-icon-alt")

            if title_tag and price_tag:
                title = title_tag.text.strip()
                if title not in seen_titles:
                    seen_titles.add(title)
                    books.append({
                        "Title": title,
                        "Author": author_tag.text.strip() if author_tag else "N/A",
                        "Price": price_tag.text.strip(),
                        "Rating": rating_tag.text.strip() if rating_tag else "N/A"
                    })
        if len(books) >= num_books:
            break

        page += 1
        time.sleep(random.uniform(1.5, 3.0))

    # Convert to DataFrame
    df = pd.DataFrame(books)
    df.drop_duplicates(subset="Title", inplace=True)

    # Create directory for raw data.
        # Note: This works here because everything runs in one container.
        # In real deployments, you'd use shared storage (e.g., S3/GCS) instead.
    os.makedirs("/opt/airflow/tmp", exist_ok=True)
    raw_path = "/opt/airflow/tmp/amazon_books_raw.csv"

    # Save the extracted dataset
    df.to_csv(raw_path, index=False)
    print(f"[EXTRACT] Amazon book data successfully saved at {raw_path}")

    # Push DataFrame path to XCom
    import json

    summary = {
        "rows": len(df),
        "columns": list(df.columns),
        "sample": df.head(3).to_dict('records'),
    }

    # Clean up non-breaking spaces and format neatly
    formatted_summary = json.dumps(summary, indent=2, ensure_ascii=False).replace('\xa0', ' ')

    if ti:
        ti.xcom_push(key='df_summary', value= formatted_summary)
        print("[XCOM] Pushed JSON summary to XCom.")

    # Optional preview
    print("\nPreview of Extracted Data:")
    print(df.head(5).to_string(index=False))

    return raw_path

This approach makes your pipeline deterministic and meaningful; it doesn’t rely on arbitrary randomness but on a fixed, observable window of recent and visible listings.

You can run this one task and view the logs.

Data Extraction with Airflow (Amazon Book Data Project) (2)

By calling this function, within our def amazon_books_etl() function, we are actually creating a task, and Airflow will consider this as one task:

def amazon_books_etl():
    ---
    # Task dependencies 
    raw_file = get_amazon_data_books()

dag = amazon_books_etl()

You should also notice that we are passing a few pieces of information related to our extracted data to XCOM. These include the total length of our dataframe, the total number of columns, and also the first three rows. This will help us understand the transformation(including cleaning) we need for our data.

Data Transformation with Airflow

Once our raw Amazon book data is extracted and stored, the next step is data transformation — converting the messy, unstructured output into a clean, analysis-ready format.

If we inspect the sample data passed through XCom, it looks something like this:

{
  "rows": 42,
  "sample": [
    {
      "Title": "Data Engineering Foundations: Core Techniques for Data Analysis with Pandas, NumPy, and Scikit-Learn (Advanced Data Analysis Series Book 1)",
      "Author": "Kindle Edition",
      "Price": "$44.90",
      "Rating": "4.2 out of 5 stars"
    }
  ],
  "columns": ["Title", "Author", "Price", "Rating"]
}

We can already notice a few data quality issues:

  • The Price column includes the dollar sign ($) — we’ll remove it and convert prices to numeric values.
  • The Rating column contains text like "4.2 out of 5 stars" — we’ll extract just the numeric part (4.2).
  • The Price column name isn’t very clear — we’ll rename it to Price($) for consistency.

Here’s the updated transformation task:

@task
def transform_amazon_books(raw_file: str):
    """
    Standardizes the extracted Amazon book dataset for analysis.
    - Converts price strings (e.g., '$45.99') into numeric values
    - Extracts numeric ratings (e.g., '4.2' from '4.2 out of 5 stars')
    - Renames 'Price' to 'Price($)'
    - Handles missing or unexpected field formats safely
    - Performs light validation after numeric conversion
    """
    if not os.path.exists(raw_file):
        raise FileNotFoundError(f" Raw file not found: {raw_file}")

    df = pd.read_csv(raw_file)
    print(f"[TRANSFORM] Loaded {len(df)} records from raw dataset.")

    # --- Price cleaning (defensive) ---
    if "Price" in df.columns:
        df["Price($)"] = (
            df["Price"]
            .astype(str)                                   # prevents .str on NaN
            .str.replace("$", "", regex=False)
            .str.replace(",", "", regex=False)
            .str.extract(r"(\d+\.?\d*)")[0]                # safely extract numbers
        )
        df["Price($)"] = pd.to_numeric(df["Price($)"], errors="coerce")
    else:
        print("[TRANSFORM] Missing 'Price' column — filling with None.")
        df["Price($)"] = None

    # --- Rating cleaning (defensive) ---
    if "Rating" in df.columns:
        df["Rating"] = (
            df["Rating"]
            .astype(str)
            .str.extract(r"(\d+\.?\d*)")[0]
        )
        df["Rating"] = pd.to_numeric(df["Rating"], errors="coerce")
    else:
        print("[TRANSFORM] Missing 'Rating' column — filling with None.")
        df["Rating"] = None

    # --- Validation: drop rows where BOTH fields failed (optional) ---
    df.dropna(subset=["Price($)", "Rating"], how="all", inplace=True)

    # --- Drop original Price column (if present) ---
    if "Price" in df.columns:
        df.drop(columns=["Price"], inplace=True)

    # --- Save cleaned dataset ---
    transformed_path = raw_file.replace("raw", "transformed")
    df.to_csv(transformed_path, index=False)

    print(f"[TRANSFORM] Cleaned data saved at {transformed_path}")
    print(f"[TRANSFORM] {len(df)} valid records after standardization.")
    print(f"[TRANSFORM] Sample cleaned data:\n{df.head(5).to_string(index=False)}")

    return transformed_path

This transformation ensures that by the time our data reaches the loading stage (MySQL), it’s tidy, consistent, and ready for querying, for instance, to quickly find the highest-rated or most affordable data engineering books.

Data Transformation with Airflow (Amazon Book Data Project)

  • Note: Although we’re working with real Amazon data, this transformation logic is intentionally simplified for the purposes of the tutorial. Amazon’s page structure can change, and real-world pipelines typically include more robust safeguards, such as retries, stronger validation rules, fallback parsing strategies, and alerting, so that temporary layout changes or missing fields don’t break the entire workflow. The defensive checks added here help keep the DAG stable, but a production deployment would apply additional hardening to handle a broader range of real-world variations

Data Loading with Airflow

The final step in our ETL pipeline is data loading, moving our transformed dataset into a structured database where it can be queried, analyzed, and visualized.

At this stage, we’ve already extracted live book listings from Amazon and transformed them into clean, numeric-friendly records. Now we’ll store this data in a MySQL database, ensuring that every 24 hours our dataset refreshes with the latest available titles, authors, prices, and ratings.

We’ll use a local MySQL instance for simplicity, but the same logic applies to cloud-hosted databases like Amazon RDS, Google Cloud SQL, or Azure MySQL.

Before proceeding, make sure MySQL is installed and running locally, with a database and user configured as:

CREATE DATABASE airflow_db;
CREATE USER 'airflow'@'%' IDENTIFIED BY 'airflow';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow'@'%';
FLUSH PRIVILEGES;

When running Airflow in Docker and MySQL locally on Linux, Docker containers can’t automatically access localhost.

To fix this, you need to make your local machine reachable from inside Docker.

Open your docker-compose.yaml file and add the following line under the x-airflow-common service definition:

extra_hosts:
  - "host.docker.internal:host-gateway"

Once configured, we can define our load task:

@task
def load_to_mysql(transformed_file: str):
    """
    Loads the transformed Amazon book dataset into a MySQL table for analysis.
    Uses a truncate-and-load pattern to keep the table idempotent.
    """
    import mysql.connector
    import os
    import numpy as np

    # Note:
    # For production-ready projects, database credentials should never be hard-coded.
    # Airflow provides a built-in Connection system and can also integrate with
    # secret backends (AWS Secrets Manager, Vault, etc.).
    #
    # Example:
    #     hook = MySqlHook(mysql_conn_id="my_mysql_conn")
    #     conn = hook.get_conn()
    #
    # For this demo, we keep a simple local config:
    db_config = {
        "host": "host.docker.internal",
        "user": "airflow",
        "password": "airflow",
        "database": "airflow_db",
        "port": 3306
    }

    df = pd.read_csv(transformed_file)
    table_name = "amazon_books_data"

    # Replace NaN with None (important for MySQL compatibility)
    df = df.replace({np.nan: None})

    conn = mysql.connector.connect(**db_config)
    cursor = conn.cursor()

    # Create table if it does not exist
    cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
            Title VARCHAR(512),
            Author VARCHAR(255),
            `Price($)` DECIMAL(10,2),
            Rating DECIMAL(4,2)
        );
    """)

    # Truncate table for idempotency
    cursor.execute(f"TRUNCATE TABLE {table_name};")

    # Insert rows
    insert_query = f"""
        INSERT INTO {table_name} (Title, Author, `Price($)`, Rating)
        VALUES (%s, %s, %s, %s)
    """

    for _, row in df.iterrows():
        try:
            cursor.execute(
                insert_query,
                (row["Title"], row["Author"], row["Price($)"], row["Rating"])
            )
        except Exception as e:
            # For demo purposes we simply skip bad rows.
            # In real pipelines, you'd log or send them to a dead-letter table.
            print(f"[LOAD] Skipped corrupted row due to error: {e}")

    conn.commit()
    conn.close()

    print(f"[LOAD] Table '{table_name}' refreshed with {len(df)} rows.")

This task reads the cleaned dataset and inserts it into a table named amazon_books_data inside your airflow_db database.

Data Loading with Airflow (Amazon Book Data Project) (3)

You will also notice that, in the code above that we use a TRUNCATE statement before inserting new rows. This turns the loading step into a full-refresh pattern, making the task idempotent. In other words, running the DAG multiple times produces the same final table instead of accumulating duplicate snapshots from previous days. This is ideal for scraped datasets like Amazon listings, where we want each day’s table to represent only the latest available snapshot.

At this stage, your workflow is fully defined and properly instantiated in Airflow, with each task connected in the correct order to form a complete ETL pipeline. Your DAG structure should now look like this:

def amazon_books_etl():
    # Task dependencies
    raw_file = get_amazon_data_books()
    transformed_file = transform_amazon_books(raw_file)
    load_to_mysql(transformed_file)

dag = amazon_books_etl()

Data Loading with Airflow (Amazon Book Data Project)

After your DAG runs (use docker compose up -d), you can verify the results inside MySQL:

USE airflow_db;
SHOW TABLES;
SELECT * FROM amazon_books_data LIMIT 5;

Your table should now contain the latest snapshot of data engineering books, automatically updated daily through Airflow’s scheduling system.

If you'd like, I can also show the Airflow Connection UI configuration example or an example using MySqlHook directly instead of mysql.connector.

Data Loading with Airflow (Amazon Book Data Project) (2)

Adding Git Sync, CI, and Alerts

At this point, you’ve successfully extracted, transformed, and loaded your data. However, your DAGs are still stored locally on your computer, which makes it difficult for collaborators to contribute and puts your entire workflow at risk if your machine fails or gets corrupted.

In this final section, we’ll introduce a few production-like patterns, version control, automated DAG syncing, basic CI checks, and failure alerts. These don’t make the project fully production-ready, but they represent the core practices most data engineers start with when moving beyond local development. The goal here is to show the essential workflow: storing DAGs in Git, syncing them automatically, validating updates before deployment, and receiving notifications when something breaks.

(For a more detailed explanation of the Git Sync setup shown below, you can read the extended breakdown here.)

To begin, create a public or private repository named airflow_dags (e.g., https://github.com/<your-username>/airflow_dags).

Then, in your project root (airflow-docker), initialize Git and push your local dags/ directory:

git init
git remote add origin https://github.com/<your-username>/airflow_dags.git
git add dags/
git commit -m "Add Airflow ETL pipeline DAGs"
git branch -M main
git push -u origin main

Once complete, your DAGs live safely in GitHub, ready for syncing.

1. Automating Dags with Git Sync

Rather than manually copying DAG files into your Airflow container, we can automate this using git-sync. This lightweight sidecar container continuously clones your GitHub repository into a shared volume.

Each time you push new DAG updates to GitHub, git-sync automatically pulls them into your Airflow environment, no rebuilds, no restarts. This ensures every environment always runs the latest, version-controlled workflow.

As we saw previously, we need to add a new git-sync service to our docker-compose.yaml and create a shared volume called airflow-dags-volume (this can be any name, just make it consistent) that both git-sync and Airflow will use.

services:
  git-sync:
    image: registry.k8s.io/git-sync/git-sync:v4.1.0
    user: "0:0"    # run as root so it can create /dags/git-sync
    restart: always
    environment:
      GITSYNC_REPO: "https://github.com/<your-username>/airflow-dags.git"
      GITSYNC_BRANCH: "main"           # use BRANCH not REF
      GITSYNC_PERIOD: "30s"
      GITSYNC_DEPTH: "1"
      GITSYNC_ROOT: "/dags/git-sync"
      GITSYNC_DEST: "repo"
      GITSYNC_LINK: "current"
      GITSYNC_ONE_TIME: "false"
      GITSYNC_ADD_USER: "true"
      GITSYNC_CHANGE_PERMISSIONS: "1"
      GITSYNC_STALE_WORKTREE_TIMEOUT: "24h"
    volumes:
      - airflow-dags-volume:/dags
    healthcheck:
      test: ["CMD-SHELL", "test -L /dags/git-sync/current && test -d /dags/git-sync/current/dags && [ \"$(ls -A /dags/git-sync/current/dags 2>/dev/null)\" ]"]
      interval: 10s
      timeout: 3s
      retries: 10
      start_period: 10s

volumes:
  airflow-dags-volume:

We then replace the original DAGs mount line in the volumes section(- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags) with - airflow-dags-volume:/opt/airflow/dags so Airflow reads DAGs directly from the synchronized Git volume.

We also set AIRFLOW__CORE__DAGS_FOLDER to /opt/airflow/dags/git-sync/current/dags so Airflow always points to the latest synced repository.

Finally, each Airflow service (airflow-apiserver, airflow-triggerer, airflow-dag-processor, and airflow-scheduler) is updated with a depends_on condition to ensure they only start after git-sync has successfully cloned the DAGs:

git-sync:
        condition: service_healthy

2. Adding Continuous Integration (CI) with GitHub Actions

To avoid deploying broken DAGs, we can add a lightweight GitHub Actions pipeline that validates DAG syntax before merging into the main branch.

Create a file in your repository:

.github/workflows/validate-dags.yml

name: Validate Airflow DAGs

on:
  push:
    branches: [ main ]
    paths:
      - 'dags/**'

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      # Install all required packages for your DAG imports
      # (instead of only installing Apache Airflow)
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
        # Validate that all DAGs parse correctly
      # This imports every DAG file; if any import fails, CI fails.
      - name: Validate DAGs
        run: |
          echo "Validating DAG syntax..."
          airflow dags list || exit 1

This workflow automatically runs when new DAGs are pushed, ensuring they parse correctly before reaching Airflow.

3. Setting Up Alerts for Failures

Finally, for real-time visibility, Airflow provides alerting mechanisms that can notify you of any failed tasks via email or Slack.

Add this configuration under your DAG’s default_args:

default_args = {
    "owner": "Data Engineering Team",
    "email": ["[email protected]"],
    "email_on_failure": True,
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

If an extraction fails (for instance, Amazon changes its HTML structure or MySQL goes offline), Airflow automatically sends an alert with the error log and task details.

Summary and Up Next

In this tutorial, you built a complete, real-world ETL pipeline in Apache Airflow by moving beyond simulated workflows and extracting live book data from Amazon. You designed a production-style workflow that scrapes, cleans, and loads Amazon listings into MySQL, while organizing your code using Airflow’s TaskFlow API for clarity, modularity, and reliability.

You then strengthened your setup by integrating Git-based DAG management with git-sync, adding GitHub Actions CI to automatically validate workflows, and enabling alerting so failures are detected and surfaced immediately.

Together, these improvements transformed your project into a version-controlled, automated orchestration system that mirrors real production environments and prepares you for cloud deployment.

As a next step, you can expand this workflow by exploring other Amazon categories or by applying the same scraping and ETL techniques to entirely different websites, such as OpenWeather for weather insights or Indeed for job listings. This will broaden your data engineering experience with new, real-world data sources. Running Airflow in the cloud is also an important milestone; this tutorial will help you further deepen your understanding of cloud-based Airflow deployments.

❌