Getting Started with Chainlink Data Streams (Hardhat CLI)

This guide shows you how to read data from a Data Streams stream, verify the answer onchain, and store it.

This example uses the Streams Trade implementation, with a Chainlink Automation Log Trigger to check for events that require data. For this example, the log trigger comes from a simple emitter contract. Chainlink Automation then uses StreamsLookup to retrieve a signed report from the Data Streams Aggregation Network, return the data in a callback, and run the performUpkeep function on your registered upkeep contract. The performUpkeep function calls the verify function on the verifier contract.

Note: To learn how to use the Streams Direct implementation of Data Streams, see the Fetch and decode reports via a REST API guide or the Stream and decode reports via WebSocket guide.

Before you begin

  • Chainlink Data Streams is available in Early Access. Contact us to request mainnet or testnet early access.
  • This guide uses the Hardhat development environment to deploy and interact with the contracts. To learn more about Hardhat, read the Hardhat Documentation.

Requirements

  • Git: Make sure you have Git installed. You can check your current version by running git --version in your terminal and download the latest version from the official Git website if necessary.
  • Nodejs and npm: Install the latest release of Node.js 20. Optionally, you can use nvm to switch between installed Node.js versions with nvm use 20. To ensure you are running the correct version in a terminal, type node -v.
     $ node -v
     v20.11.0
    
  • Testnet funds: This guide requires testnet ETH and LINK on Arbitrum Sepolia. Both are available at faucets.chain.link.

Tutorial

Setup

  1. Clone the repository that contains the Hardhat project setup for this guide. This repository contains the Solidity contracts and the Hardhat configuration files you need to deploy and interact with the contracts.

    git clone https://github.com/smartcontractkit/smart-contract-examples.git
    cd smart-contract-examples/data-streams/getting-started/hardhat
    
  2. Install the dependencies:

    npm install
    
  3. Set an encryption password for your environment variables. This password needs to be set each time you create or restart a terminal session.

    npx env-enc set-pw
    
  4. Set the required environment variables using the following command:

    npx env-enc set
    
    • PRIVATE_KEY: The private key for your testnet wallet that will deploy and interact with the contracts. If you use MetaMask, follow the instructions to Export a Private Key.
    • ARBITRUM_SEPOLIA_RPC_URL: The Remote Procedure Call (RPC) URL for the Arbitrum Sepolia network. You can obtain one by creating an account on Alchemy or Infura and setting up an Arbitrum Sepolia project.

Deploy the upkeep and the log emitter contracts

Deploy an upkeep contract that is enabled to retrieve data from Data Streams. For this example, you will read from the ETH/USD stream with ID 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782 on Arbitrum Sepolia. See the Data Streams Crypto Addresses page for a complete list of available crypto assets, IDs, and verifier proxy addresses.

Execute the following command to deploy the Chainlink Automation upkeep contract and the Log Emitter contract to the Arbitrum Sepolia network.

npx hardhat deployAll --network arbitrumSepolia

Expect output similar to the following in your terminal:

ℹ Deploying StreamsUpkeepRegistrar contract...
✔ StreamsUpkeepRegistrar deployed at: 0x48403478Aa021A9BC30Da0BDE47cbc155CcA8916
ℹ Deploying LogEmitter contract...
✔ LogEmitter deployed at: 0xD721337a827F9D814daEcCc3c7e72300af914BFE
✔ All contracts deployed successfully.

Save the deployed contract addresses for both contracts. You will use these addresses later.

Fund the upkeep contract

In this example, the upkeep contract pays for onchain verification of reports from Data Streams. The Automation subscription does not cover the cost. Transfer 1.5 testnet LINK to the upkeep contract address you saved earlier. You can retrieve unused LINK later.

npx hardhat transfer-link --recipient <StreamsUpkeepRegistrarAddress> --amount 1500000000000000000 --network arbitrumSepolia

Replace <StreamsUpkeepRegistrarAddress> with the address of the StreamsUpkeepRegistrar contract you saved earlier.

Expect output similar to the following in your terminal:

ℹ Starting LINK transfer from <YOUR_ADDRESS> to the streams upkeep contract at 0xD721337a827F9D814daEcCc3c7e72300af914BFE
ℹ LINK token address: 0xb1D4538B4571d411F07960EF2838Ce337FE1E80E
ℹ LINK balance of sender 0x45C90FBb5acC1a5c156a401B56Fea55e69E7669d is 6.5 LINK
✔ 1.5 LINK were sent from 0x45C90FBb5acC1a5c156a401B56Fea55e69E7669d to 0xD721337a827F9D814daEcCc3c7e72300af914BFE. Transaction Hash: 0xf241bf4415ec081325ccd8ec3d54432e424afd16f1c81fa78b291ae9a0c03ce2

Register and fund the upkeep

Programmatically register and fund a new Log Trigger upkeep with 1 LINK:

npx hardhat registerAndFundUpkeep --streams-upkeep <StreamsUpkeepRegistrarAddress> --log-emitter <LogEmitterAddress> --network arbitrumSepolia

Replace <StreamsUpkeepRegistrarAddress> and <LogEmitterAddress> with the addresses of your StreamsUpkeepRegistrar and LogEmitter contracts.

Expect output similar to the following in your terminal:

✔ Upkeep registered and funded with 1 LINK successfully.

Emit a log

Now, you can use your emitter contract to emit a log and initiate the upkeep, which retrieves data for the specified stream ID.

npx hardhat emitLog --log-emitter <LogEmitterAddress> --network arbitrumSepolia

Replace <LogEmitterAddress> with the address of your LogEmitter contract.

Expect output similar to the following in your terminal:

✔ Log emitted successfully in transaction: 0x236ee95faade12d1b6d497ee2e51ddf957f7d4986ffe51d784b923081ed440ff

After the transaction is complete, the log is emitted, and the upkeep is triggered.

View the retrieved price

The retrieved price is stored in the lastDecodedPrice contract variable and emitted in the logs. To see the price retrieved by the StreamsUpkeepRegistrar contract:

npx hardhat getLastRetrievedPrice --streams-upkeep <StreamsUpkeepRegistrarAddress> --network arbitrumSepolia

Replace <StreamsUpkeepRegistrarAddress> with the address of your StreamsUpkeepRegistrar contract.

Expect output similar to the following in your terminal:

✔ Last Retrieved Price: 2945878120219995000000

The answer on the ETH/USD stream uses 18 decimal places, so an answer of 2945878120219995000000 indicates an ETH/USD price of 2,945.878120219995. Some streams may use a different number of decimal places for answers. See the Data Streams Crypto Addresses page for more information.

Alternatively, you can view the price emitted in the logs for your upkeep transaction.

You can find the upkeep transaction hash at Chainlink Automation UI and view the transaction logs in the Arbitrum Sepolia explorer.

Examine the code

The example code you deployed has all the interfaces and functions required to work with Chainlink Automation as an upkeep contract. It follows a similar flow to the trading flow in the Architecture documentation but uses a basic log emitter to simulate the client contract that would initiate a StreamsLookup. After the contract receives and verifies the report, performUpkeep stores the price from the report in the lastDecodedPrice and emits a PriceUpdate log message with the price. You could modify this to use the data in a way that works for your specific use case and application.

The code example uses revert with StreamsLookup to convey call information about what streams to retrieve. See the EIP-3668 rationale for more information about how to use revert in this way.

// SPDX-License-Identifier: MIT
pragma solidity 0.8.19;

import {Common} from "@chainlink/contracts/src/v0.8/llo-feeds/libraries/Common.sol";
import {StreamsLookupCompatibleInterface} from "@chainlink/contracts/src/v0.8/automation/interfaces/StreamsLookupCompatibleInterface.sol";
import {ILogAutomation, Log} from "@chainlink/contracts/src/v0.8/automation/interfaces/ILogAutomation.sol";
import {IRewardManager} from "@chainlink/contracts/src/v0.8/llo-feeds/interfaces/IRewardManager.sol";
import {IVerifierFeeManager} from "@chainlink/contracts/src/v0.8/llo-feeds/interfaces/IVerifierFeeManager.sol";
import {IERC20} from "@chainlink/contracts/src/v0.8/vendor/openzeppelin-solidity/v4.8.3/contracts/interfaces/IERC20.sol";
import {LinkTokenInterface} from "@chainlink/contracts/src/v0.8/shared/interfaces/LinkTokenInterface.sol";

/**
 * THIS IS AN EXAMPLE CONTRACT THAT USES UN-AUDITED CODE FOR DEMONSTRATION PURPOSES.
 * DO NOT USE THIS CODE IN PRODUCTION.
 */

/**
 * @dev Defines the parameters required to register a new upkeep.
 * @param name The name of the upkeep to be registered.
 * @param encryptedEmail An encrypted email address associated with the upkeep (optional).
 * @param upkeepContract The address of the contract that requires upkeep.
 * @param gasLimit The maximum amount of gas to be used for the upkeep execution.
 * @param adminAddress The address that will have administrative privileges over the upkeep.
 * @param triggerType An identifier for the type of trigger that initiates the upkeep (`1` for event-based).
 * @param checkData Data passed to the checkUpkeep function to simulate conditions for triggering upkeep.
 * @param triggerConfig Configuration parameters specific to the trigger type.
 * @param offchainConfig Off-chain configuration data, if applicable.
 * @param amount The amount of LINK tokens to fund the upkeep registration.
 */
struct RegistrationParams {
    string name;
    bytes encryptedEmail;
    address upkeepContract;
    uint32 gasLimit;
    address adminAddress;
    uint8 triggerType;
    bytes checkData;
    bytes triggerConfig;
    bytes offchainConfig;
    uint96 amount;
}

/**
 * @dev Interface for the Automation Registrar contract.
 */
interface AutomationRegistrarInterface {
    /**
     * @dev Registers a new upkeep contract with Chainlink Automation.
     * @param requestParams The parameters required for the upkeep registration, encapsulated in `RegistrationParams`.
     * @return upkeepID The unique identifier for the registered upkeep, used for future interactions.
     */
    function registerUpkeep(
        RegistrationParams calldata requestParams
    ) external returns (uint256);
}

// Custom interfaces for Data Streams: IVerifierProxy and IFeeManager
interface IVerifierProxy {
    function verify(
        bytes calldata payload,
        bytes calldata parameterPayload
    ) external payable returns (bytes memory verifierResponse);

    function s_feeManager() external view returns (IVerifierFeeManager);
}

interface IFeeManager {
    function getFeeAndReward(
        address subscriber,
        bytes memory unverifiedReport,
        address quoteAddress
    ) external returns (Common.Asset memory, Common.Asset memory, uint256);

    function i_linkAddress() external view returns (address);

    function i_nativeAddress() external view returns (address);

    function i_rewardManager() external view returns (address);
}

contract StreamsUpkeepRegistrar is
    ILogAutomation,
    StreamsLookupCompatibleInterface
{
    error InvalidReportVersion(uint16 version); // Thrown when an unsupported report version is provided to verifyReport.

    LinkTokenInterface public immutable i_link;
    AutomationRegistrarInterface public immutable i_registrar;

    /**
     * @dev Represents a data report from a Data Streams feed for v3 schema (crypto feeds).
     * The `price`, `bid`, and `ask` values are carried to either 8 or 18 decimal places, depending on the feed.
     * For more information, see https://docs.chain.link/data-streams/crypto-streams and https://docs.chain.link/data-streams/reference/report-schema
     */
    struct ReportV3 {
        bytes32 feedId; // The feed ID the report has data for.
        uint32 validFromTimestamp; // Earliest timestamp for which price is applicable.
        uint32 observationsTimestamp; // Latest timestamp for which price is applicable.
        uint192 nativeFee; // Base cost to validate a transaction using the report, denominated in the chain’s native token (e.g., WETH/ETH).
        uint192 linkFee; // Base cost to validate a transaction using the report, denominated in LINK.
        uint32 expiresAt; // Latest timestamp where the report can be verified onchain.
        int192 price; // DON consensus median price (8 or 18 decimals).
        int192 bid; // Simulated price impact of a buy order up to the X% depth of liquidity utilisation (8 or 18 decimals).
        int192 ask; // Simulated price impact of a sell order up to the X% depth of liquidity utilisation (8 or 18 decimals).
    }

    /**
     * @dev Represents a data report from a Data Streams feed for v4 schema (RWA feeds).
     * The `price` value is carried to either 8 or 18 decimal places, depending on the feed.
     * The `marketStatus` indicates whether the market is currently open. Possible values: `0` (`Unknown`), `1` (`Closed`), `2` (`Open`).
     * For more information, see https://docs.chain.link/data-streams/rwa-streams and https://docs.chain.link/data-streams/reference/report-schema-v4
     */
    struct ReportV4 {
        bytes32 feedId; // The feed ID the report has data for.
        uint32 validFromTimestamp; // Earliest timestamp for which price is applicable.
        uint32 observationsTimestamp; // Latest timestamp for which price is applicable.
        uint192 nativeFee; // Base cost to validate a transaction using the report, denominated in the chain’s native token (e.g., WETH/ETH).
        uint192 linkFee; // Base cost to validate a transaction using the report, denominated in LINK.
        uint32 expiresAt; // Latest timestamp where the report can be verified onchain.
        int192 price; // DON consensus median benchmark price (8 or 18 decimals).
        uint32 marketStatus; // The DON's consensus on whether the market is currently open.
    }

    struct Quote {
        address quoteAddress;
    }

    event PriceUpdate(int192 indexed price);

    IVerifierProxy public verifier;

    address public FEE_ADDRESS;
    string public constant DATASTREAMS_FEEDLABEL = "feedIDs";
    string public constant DATASTREAMS_QUERYLABEL = "timestamp";
    int192 public lastDecodedPrice;
    uint256 s_upkeepID;
    bytes public s_LogTriggerConfig;

    // Find a complete list of IDs at https://docs.chain.link/data-streams/crypto-streams
    string[] public feedIds;

    constructor(
        address _verifier,
        LinkTokenInterface link,
        AutomationRegistrarInterface registrar,
        string[] memory _feedIds
    ) {
        verifier = IVerifierProxy(_verifier);
        i_link = link;
        i_registrar = registrar;
        feedIds = _feedIds;
    }

    /**
     * @notice Registers a new upkeep using the specified parameters and predicts its ID.
     * @dev This function first approves the transfer of LINK tokens specified in `params.amount` to the Automation Registrar contract.
     *      It then registers the upkeep and stores its ID if registration is successful.
     *      Reverts if auto-approve is disabled or registration fails.
     * @param params The registration parameters, including name, upkeep contract address, gas limit, admin address, trigger type, and funding amount.
     */
    function registerAndPredictID(RegistrationParams memory params) public {
        i_link.approve(address(i_registrar), params.amount);
        uint256 upkeepID = i_registrar.registerUpkeep(params);
        if (upkeepID != 0) {
            s_upkeepID = upkeepID; // DEV - Use the upkeepID however you see fit
        } else {
            revert("auto-approve disabled");
        }
    }

    /**
     * @notice this is a new, optional function in streams lookup. It is meant to surface streams lookup errors.
     * @return upkeepNeeded boolean to indicate whether the keeper should call performUpkeep or not.
     * @return performData bytes that the keeper should call performUpkeep with, if
     * upkeep is needed. If you would like to encode data to decode later, try `abi.encode`.
     */
    function checkErrorHandler(
        uint256 /*errCode*/,
        bytes memory /*extraData*/
    ) external pure returns (bool upkeepNeeded, bytes memory performData) {
        return (true, "0");
        // Hardcoded to always perform upkeep.
        // Read the StreamsLookup error handler guide for more information.
        // https://docs.chain.link/chainlink-automation/guides/streams-lookup-error-handler
    }

    // This function uses revert to convey call information.
    // See https://eips.ethereum.org/EIPS/eip-3668#rationale for details.
    function checkLog(
        Log calldata log,
        bytes memory
    ) external returns (bool upkeepNeeded, bytes memory performData) {
        revert StreamsLookup(
            DATASTREAMS_FEEDLABEL,
            feedIds,
            DATASTREAMS_QUERYLABEL,
            log.timestamp,
            ""
        );
    }

    // The Data Streams report bytes is passed here.
    // extraData is context data from feed lookup process.
    // Your contract may include logic to further process this data.
    // This method is intended only to be simulated offchain by Automation.
    // The data returned will then be passed by Automation into performUpkeep
    function checkCallback(
        bytes[] calldata values,
        bytes calldata extraData
    ) external pure returns (bool, bytes memory) {
        return (true, abi.encode(values, extraData));
    }

    // function will be performed onchain
    function performUpkeep(bytes calldata performData) external {
        // Decode the performData bytes passed in by CL Automation.
        // This contains the data returned by your implementation in checkCallback().
        (bytes[] memory signedReports, bytes memory extraData) = abi.decode(
            performData,
            (bytes[], bytes)
        );

        bytes memory unverifiedReport = signedReports[0];

        (, /* bytes32[3] reportContextData */ bytes memory reportData) = abi
            .decode(unverifiedReport, (bytes32[3], bytes));

        // Extract report version from reportData
        uint16 reportVersion = (uint16(uint8(reportData[0])) << 8) |
            uint16(uint8(reportData[1]));

        // Validate report version
        if (reportVersion != 3 && reportVersion != 4) {
            revert InvalidReportVersion(uint8(reportVersion));
        }

        // Report verification fees
        IFeeManager feeManager = IFeeManager(address(verifier.s_feeManager()));
        IRewardManager rewardManager = IRewardManager(
            address(feeManager.i_rewardManager())
        );

        address feeTokenAddress = feeManager.i_linkAddress();
        (Common.Asset memory fee, , ) = feeManager.getFeeAndReward(
            address(this),
            reportData,
            feeTokenAddress
        );

        // Approve rewardManager to spend this contract's balance in fees
        IERC20(feeTokenAddress).approve(address(rewardManager), fee.amount);

        // Verify the report
        bytes memory verifiedReportData = verifier.verify(
            unverifiedReport,
            abi.encode(feeTokenAddress)
        );

        // Decode verified report data into the appropriate Report struct based on reportVersion
        if (reportVersion == 3) {
            // v3 report schema
            ReportV3 memory verifiedReport = abi.decode(
                verifiedReportData,
                (ReportV3)
            );

            // Store the price from the report
            lastDecodedPrice = verifiedReport.price;
        } else if (reportVersion == 4) {
            // v4 report schema
            ReportV4 memory verifiedReport = abi.decode(
                verifiedReportData,
                (ReportV4)
            );

            // Store the price from the report
            lastDecodedPrice = verifiedReport.price;
        }
    }
}

Initializing the contract

When you deploy the contract, you define:

  1. The verifier proxy address that you can find on the Data Stream Addresses page. The IVerifierProxy interface provides the following functions:

    • The s_feeManager function to estimate the verification fees.
    • The verify function to verify the report onchain.
  2. The LINK token address. This address is used to register and fund your upkeep. You can find the LINK token address on the Chainlink Token Addresses page.

  3. The registrar's contract address. This address is used to register your upkeep. You can find the registrar contract addresses on the Chainlink Automation Supported Networks page.

Funding the upkeep contract

In this example, you must fund the StreamsUpkeepRegistrar contract with testnet LINK tokens to pay the onchain report verification fees. You can use the transfer-link task to transfer LINK tokens to the StreamsUpkeepRegistrar contract you deployed.

The transfer-link Hardhat task sets up the necessary parameters for the LINK token transfer and submits the transfer request to the LINK token contract using the transfer function.

Note: Funding the StreamsUpkeepRegistrar contract is distinct from funding your Chainlink Automation upkeep to pay the fees to perform the upkeep.

Registering the upkeep

You need to register your log-triggered upkeep with the Chainlink Automation registrar. You can use the registerAndFundLogUpkeep task to programmatically register the StreamsUpkeepRegistrar and LogEmitter contracts with the Chainlink Automation registrar. The task also funds the upkeep with 1 testnet LINK token.

The registerAndFundLogUpkeep Hardhat task sets up the necessary parameters for upkeep registration, including trigger configuration for a Log Emitter contract, and submits the registration request to the registrar contract via the registerAndPredictID function.

You can use the Chainlink Automation UI to view the registered upkeep and the upkeep's configuration.

Emitting a log, retrieving, and verifying the report

You can use the emitLog task to emit a log from the LogEmitter contract.

  1. The emitted log triggers the Chainlink Automation upkeep.
  2. Chainlink Automation then uses StreamsLookup to retrieve a signed report from the Data Streams Aggregation Network, returns the data in a callback (checkCallback), and runs the performUpkeep function on your registered upkeep contract.
  3. The performUpkeep function calls the verify function on the verifier contract to verify the report onchain.
  4. In this example, the performUpkeep function also stores the price from the report in the lastDecodedPrice state variable and emits a PriceUpdate log message with the price.

Viewing the retrieved price

The getLastRetrievedPrice Hardhat task retrieves the last price updated by the performUpkeep function in the lastDecodedPrice state variable of the StreamsUpkeepRegistrar contract.

Feed ID types and conversion

Chainlink Data Streams uses different data types for feed IDs at different stages of the process:

  • The StreamsLookup error requires feed IDs to be provided as an array of string,
  • The decoded reports within the contract use bytes32 types for feed IDs (see the Report Schemas reference).

If your application needs to compare the feed ID(s) sent in the StreamsLookup with those received in the report(s), you must convert between string and bytes32 types.

Optional: Handle Data Streams fetching errors offchain with checkErrorHandler

When Automation detects the triggering event, it runs the checkLog function of your upkeep contract, which includes a StreamsLookup revert custom error. The StreamsLookup revert enables your upkeep to fetch a report from the Data Streams Aggregation Network. If the report is fetched successfully, the checkCallback function is evaluated offchain. Otherwise, the checkErrorHandler function is evaluated offchain to determine what Automation should do next.

In this example, the checkErrorHandler is set to always return true for upkeepNeeded. This implies that the upkeep is always triggered, even if the report fetching fails. You can modify the checkErrorHandler function to handle errors offchain in a way that works for your specific use case. Read more about using the StreamsLookup error handler.

What's next

Get the latest Chainlink content straight to your inbox.