Tutorial, part four: Building an asset graph#

In the previous portion of the tutorial, you wrote your first Software-defined asset (SDA), looked at Dagster's UI, and manually materialized your asset.

Continuing from there, you will:

  • Add more assets to your Dagster project
  • Connect them to finish migrating the pipeline
  • Give users more knowledge about the assets by adding metadata and logging

Step 1: Adding the DataFrame asset#

By the end of the previous section, you ingested the top Hacker News story IDs into Dagster. Using this data, you'll now look up each story by its ID, ingest that data, and make a DataFrame out of it. You'll connect your current asset with this new asset to establish dependencies and make an asset graph.

Copy and paste the completed asset code below into assets.py:

Add new imports, such as import pandas as pd, to the top of assets.py

import pandas as pd # Add new imports to the top of `assets.py`

@asset
def topstories(topstory_ids): #this asset is dependent on topstory_ids
    results = []
    for item_id in topstory_ids:
        item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json()
        results.append(item)

        if len(results) % 20 == 0:
            print(f"Got {len(results)} items so far.")

    df = pd.DataFrame(results)

    return df

Dependencies between assets are defined by adding the asset name as a parameter to the dependent asset's function. In this case, topstory_ids (the list of IDs) is a dependency of topstories (the DataFrame).

In your browser, navigate to Dagster's UI (localhost:3000) and look at the asset graph to see the relationship between your assets.

Flexibility with asset dependencies#

If this syntax is too magical for you, an asset's dependencies can be explicitly defined using a parameter called ins in the @asset decorator. To learn about this syntax, refer to the Defining explicit dependencies section of the Assets concept page.

Other times, it's impossible to contain the business logic in a function or you'll never want to load the data into memory. In situations like these, you can return no data. Dependencies can also be made without loading data in the dependent asset's function. Refer to the guide on Assets without I/O for more information about these patterns.

Logging during asset materialization#

In the code above, a print statement is used to show progress while fetching stories from the Hacker News API. Dagster has a built-in logger that extends past print and other Python logging methods. This logger shows exactly where logging happens in your Dagster project, such as which asset and materialization the log came from.

To access the logger, use the get_dagster_logger function. The code below replaces the print statement with the Dagster logger:

import pandas as pd
# Addition, added an import to `get_dagster_logger`
from dagster import asset, get_dagster_logger

@asset
def topstories(topstory_ids):
    logger = get_dagster_logger()

    results = []
    for item_id in topstory_ids:
        item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json()
        results.append(item)

        if len(results) % 20 == 0:
            logger.info(f"Got {len(results)} items so far.")

    df = pd.DataFrame(results)

    return df

For more information about loggers, such as the different types of logging, refer to the Logging documentation.


Step 2: Creating an unstructured data asset#

Along with structured data like tables, Dagster's assets can also be unstructured data, such as images. Your next and final asset will take the DataFrame of stories and create a word cloud with the titles.

Below is the finished code for a word cloud asset. Copy and paste the code into assets.py:

import matplotlib.pyplot as plt
import base64
from io import BytesIO
from wordcloud import STOPWORDS, WordCloud

# Lines above are added to the top of the file

# adding the word cloud asset
@asset
def topstories_word_cloud(topstories):
    stopwords = set(STOPWORDS)
    stopwords.update(["Ask", "Show", "HN", "S"])
    titles_text = " ".join([str(item) for item in topstories["title"]])
    titles_cloud = WordCloud(stopwords=stopwords, background_color="white").generate(titles_text)

    # Generate the word cloud image
    plt.figure(figsize=(8, 8), facecolor=None)
    plt.imshow(titles_cloud, interpolation="bilinear")
    plt.axis("off")
    plt.tight_layout(pad=0)

    # Since we're saving the asset, this converts the word cloud into an image
    buffer = BytesIO()
    plt.savefig(buffer, format="png")
    image_data = base64.b64encode(buffer.getvalue())

    return image_data # returns the image instead of showing it with `plt.show()`

Step 3: Educating users with metadata#

Software-defined assets can be enriched with different types of metadata. Anything can be used as metadata for an SDA. Common details to add are:

  • Statistics about the data, such as row counts or other data profiling
  • Test results or assertions about the data
  • Images or tabular previews of the asset
  • Information about who owns the asset, where it's stored, and links to external documentation

The following code adds a row count and a preview of the topstories asset. It uses the Output class from Dagster to bundle the asset and its metadata together. Update your code for the topstories asset to match the changes below.

from dagster import asset, get_dagster_logger, Output, MetadataValue
# add an import to the Output and MetadataValue classes from the dagster library

@asset
def topstories(topstory_ids):
    logger = get_dagster_logger()

    results = []
    for item_id in topstory_ids:
        item = requests.get(
            f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
        ).json()
        results.append(item)

        if len(results) % 20 == 0:
            logger.info(f"Got {len(results)} items so far.")

    df = pd.DataFrame(results)

    return Output(  # The return value is updated to wrap it in `Output` class
        value=df,   # The original df is passed in with the `value` parameter
        metadata={
            "num_records": len(df), # Metadata can be any key-value pair
            "preview": MetadataValue.md(df.head().to_markdown()),
            # The `MetadataValue` class has useful static methods to build Metadata
        }
    )

Reload the definitions and re-materialize your assets. The metadata can then be seen in the following places:

  • In the Asset graph page, click on an asset and its metadata will be shown in the right sidebar:

    the asset's materialization metadata
  • In the Asset Catalog's page for the topstories asset:

    the asset's materialization metadata

Metadata and Markdown#

The DataFrame was embedded into the asset's metadata with Markdown. Any valid Markdown snippet can be stored and rendered in the Dagster UI, including images. By embedding the word cloud image as metadata, the word cloud asset is also visible to your team from within the UI.

Below is code that changes shows how to add an image in asset metadata. Add the following to the end of your assets.py:

from dagster import asset, MetadataValue, Output
# Addition: update your imports at the top of your file with this

@asset
def topstories_word_cloud(topstories):
    stopwords = set(STOPWORDS)
    stopwords.update(["Ask", "Show", "HN", "S"])
    titles_text = " ".join([str(item) for item in topstories["title"]])
    titles_cloud = WordCloud(stopwords=stopwords, background_color="white").generate(titles_text)

    plt.figure(figsize=(8, 8), facecolor=None)
    plt.imshow(titles_cloud, interpolation="bilinear")
    plt.axis("off")
    plt.tight_layout(pad=0)

    buffer = BytesIO()
    plt.savefig(buffer, format="png")
    image_data = base64.b64encode(buffer.getvalue())

    # Addition: Turn the image into Markdown to embed it into the metadata
    md_content = f"![img](data:image/png;base64,{image_data.decode()})"

    # Addition: Attach the Markdown content as metadata to the asset
    return Output(value=image_data, metadata={
        "plot": MetadataValue.md(md_content)
    })

Reload your definitions and rematerialize your assets. The word cloud image will be visible with the rest of your materialization metadata for the topstories_word_cloud asset. The path key in the metadata will contain a link that says [Show Markdown]. Clicking on the link will open the preview in the Dagster UI. The word cloud will change throughout the day as the top stories change. Here's an example of the wordcloud at the time we wrote this tutorial:

the asset's materialization metadata

If your data is sensitive, such as PHI or PII, be careful and follow your organization's policies for surfacing data. You should practice due diligence before showing your data in metadata or logs.


Next Steps#

Congratulations! 🎉 You've written your first pipeline in Dagster! You:

  • Wrote three assets
  • Materialized the assets
  • Previewed the data in Dagster's UI
  • Empowered stakeholders and your future self with metadata and logging

In the next portion, you'll learn how to schedule your pipeline to automatically update your assets regularly.