Streams Direct SDK (Go)

This documentation provides a detailed reference for the Data Streams SDK for Go. It implements a client library that offers a domain-oriented abstraction for interacting with the Data Streams API and enables both point-in-time data retrieval and real-time data streaming with built-in fault tolerance capabilities.

streams

Installation

import streams "github.com/smartcontractkit/data-streams-sdk/go"

Types

Client interface

Interface for interacting with the Data Streams API. Use the New function to create a new instance of the client.

Interface Methods
  • GetFeeds: Lists all streams available to you.

    GetFeeds(ctx context.Context) (r []*Feed, err error)
    
  • GetLatestReport: Fetches the latest report available for the specified FeedID.

    GetLatestReport(ctx context.Context, id FeedID) (r *Report, err error)
    
  • GetReports: Fetches reports for the specified stream IDs and a given timestamp.

    GetReports(ctx context.Context, ids []FeedID, timestamp uint64) ([]*Report, error)
    
  • GetReportPage: Paginates the reports for a specified FeedID starting from a given timestamp.

    GetReportPage(ctx context.Context, id FeedID, startTS uint64) (*ReportPage, error)
    
  • Stream: Creates a real-time report stream for specified stream IDs.

    Stream(ctx context.Context, feedIDs []FeedID) (Stream, error)
    

Config struct

Configuration struct for the client. Config specifies the client configuration and dependencies. If you specify the Logger function, informational client activity is logged.

type Config struct {
	ApiKey             string // Client API key
	ApiSecret          string // Client API secret
	RestURL            string // Rest API URL

	WsURL              string // Websocket API URL

	WsHA               bool // Use concurrent connections to multiple Streams servers
	WsMaxReconnect     int // Maximum number of reconnection attempts for Stream underlying connections
	LogDebug           bool // Log debug information
	InsecureSkipVerify bool // Skip server certificate chain and host name verification
	Logger             func(format string, a ...any) // Logger function

	// InspectHttp intercepts http responses for rest requests.
	// The response object must not be modified.
	InspectHttpResponse func(*http.Response)
}

CtxKey string

Custom context key type used for passing additional headers.

type CtxKey string
  • Constants:

    • CustomHeadersCtxKey: Key for passing custom HTTP headers.
    const (
      // CustomHeadersCtxKey is used as key in the context.Context object
      // to pass in a custom http headers in a http.Header to be used by the client.
      // Custom header values will overwrite client headers if they have the same key.
      CustomHeadersCtxKey CtxKey = "CustomHeaders"
    )
    

ReportPage struct

Represents a paginated response of reports.

type ReportPage struct {
	Reports    []*Report // Slice of Report, representing individual report entries.
	NextPageTS uint64 // Timestamp for the next page, used for fetching subsequent pages.
}

ReportResponse struct

Implements the report envelope that contains the full report payload, its stream ID and timestamps. Use the Decode function to decode the report payload.

type ReportResponse struct {
    FeedID                feed.ID `json:"feedID"`
    FullReport            []byte  `json:"fullReport"`
    ValidFromTimestamp    uint64  `json:"validFromTimestamp"`
    ObservationsTimestamp uint64  `json:"observationsTimestamp"`
}
Methods
  • MarshalJSON: Serializes the ReportResponse into JSON.

    func (r *ReportResponse) MarshalJSON() ([]byte, error)
    
  • String: Returns the string representation of the ReportResponse.

    func (r *ReportResponse) String() (s string)
    
  • UnmarshalJSON: Deserializes the ReportResponse from JSON.

    func (r *ReportResponse) UnmarshalJSON(b []byte) (err error)
    

Stats struct

Statistics related to the Stream's operational metrics.

type Stats struct {
	Accepted              uint64 // Total number of accepted reports
	Deduplicated          uint64 // Total number of deduplicated reports when in HA
	TotalReceived         uint64 // Total number of received reports
	PartialReconnects     uint64 // Total number of partial reconnects when in HA
	FullReconnects        uint64 // Total number of full reconnects
	ConfiguredConnections uint64 // Number of configured connections if in HA
	ActiveConnections     uint64 // Current number of active connections
}
Methods
  • String: Returns a string representation of the Stats.

    func (s Stats) String() (st string)
    

Stream interface

Interface for managing a real-time data stream.

Interface Methods
  • Read: Reads the next available report from the stream. This method will pause (block) the execution until one of the following occurs:

    • A report is successfully received.
    • The provided context is canceled, typically due to a timeout or a cancel signal.
    • An error state is encountered in any of the underlying connections, such as a network failure.
    Read(context.Context) (*Report, error)
    
  • Stats: Returns statistics about the stream operations as Stats.

    Stats() Stats
    
  • Close: Closes the stream.

    Close() error
    

Functions

New

Creates a new client instance with the specified configuration.

func New(cfg Config) (c Client, err error)

Note: New does not initialize any connections to the Data Streams service.

LogPrintf

Utility function for logging.

func LogPrintf(format string, a ...any)

feed

Installation

import feed "github.com/smartcontractkit/data-streams-sdk/go/feed"

Types

Feed struct

Identifies the report stream ID.

type Feed struct {
    FeedID ID `json:"feedID"`
}

Where ID is the unique identifier for the stream.

FeedVersion uint16

Represents the stream report schema version.

type FeedVersion uint16

ID [32]byte

Represents a unique identifier for a stream.

type ID [32]byte
Methods
  • FromString: Converts a string into an ID.

    func (f *ID) FromString(s string) (err error)
    
  • MarshalJSON: Serializes the ID into JSON.

    func (f *ID) MarshalJSON() (b []byte, err error)
    
  • String: Returns the string representation of the ID.

    func (f *ID) String() (id string)
    
  • UnmarshalJSON: Deserializes the ID from JSON.

    func (f *ID) UnmarshalJSON(b []byte) (err error)
    
  • Version: Returns the version of the stream.

    func (f *ID) Version() FeedVersion
    

report

Installation

import report "github.com/smartcontractkit/data-streams-sdk/go/report"

Types

Data interface

Interface that represents the actual report data and attributes.

type Data interface {
    v1.Data | v2.Data | v3.Data | v4.Data
    Schema() abi.Arguments
}

Report struct

Represents the report envelope that contains the full report payload, its Feed ID, and timestamps.

type Report[T Data] struct {
    Data          T
    ReportContext [3][32]byte
    ReportBlob    []byte
    RawRs         [][32]byte
    RawSs         [][32]byte
    RawVs         [32]byte
}

Functions

Decode

Decodes the report serialized bytes and its data.

func Decode[T Data](fullReport []byte) (r *Report[T], err error)

Example:

payload, _ := hex.DecodeString(
    `0006bd87830d5f336e205cf5c63329a1dab8f5d56812eaeb7c69300e66ab8e22000000000000000000000000000000000000000000000000000000000cf7ed13000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e0000000000000000000000000000000000000000000000000000000000000022000000000000000000000000000000000000000000000000000000000000003000101000101000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000012000030ab7d02fbba9c6304f98824524407b1f494741174320cfd17a2c22eec1de0000000000000000000000000000000000000000000000000000000066a8f5c60000000000000000000000000000000000000000000000000000000066a8f5c6000000000000000000000000000000000000000000000000000057810653dd9000000000000000000000000000000000000000000000000000541315da76d6100000000000000000000000000000000000000000000000000000000066aa474600000000000000000000000000000000000000000000000009a697ee4230350400000000000000000000000000000000000000000000000009a6506d1426d00000000000000000000000000000000000000000000000000009a77d03ae355fe0000000000000000000000000000000000000000000000000000000000000000672bac991f5233df89f581dc02a89dd8d48419e3558b247d3e65f4069fa45c36658a5a4820dc94fc47a88a21d83474c29ee38382c46b6f9a575b9ce8be4e689c03c76fac19fbec4a29dba704c72cc003a6be1f96af115e322321f0688e24720a5d9bd7136a1d96842ec89133058b888b2e6572b5d4114de2426195e038f1c9a5ce50016b6f5a5de07e08529b845e1c622dcbefa0cfa2ffd128e9932ecee8efd869bc56d09a50ceb360a8d366cfa8eefe3f64279c88bdbc887560efa9944238eb000000000000000000000000000000000000000000000000000000000000000060e2a800f169f26164533c7faff6c9073cd6db240d89444d3487113232f9c31422a0993bb47d56807d0dc26728e4c8424bb9db77511001904353f1022168723010c46627c890be6e701e766679600696866c888ec80e7dbd428f5162a24f2d8262f846bdb06d9e46d295dd8e896fb232be80534b0041660fe4450a7ede9bc3b230722381773a4ae81241568867a759f53c2bdd05d32b209e78845fc58203949e50a608942b270c456001e578227ad00861cf5f47b27b09137a0c4b7f8b4746cef`)

report, err := report.Decode[v3.Data](payload)
if err != nil {
    streams.LogPrintf("error decoding report: %s", err)
    os.Exit(1)
}

streams.LogPrintf(
    "FeedID: %s, FeedVersion: %d, Bid: %s, Ask: %s, BenchMark: %s, LinkFee: %s, NativeFee: %s, ValidFromTS: %d, ExpiresAt: %d",
    report.Data.FeedID.String(),
    report.Data.FeedID.Version(),
    report.Data.Bid.String(),
    report.Data.Ask.String(),
    report.Data.BenchmarkPrice.String(),
    report.Data.LinkFee.String(),
    report.Data.NativeFee.String(),
    report.Data.ValidFromTimestamp,
    report.Data.ExpiresAt,
)

What's next

Get the latest Chainlink content straight to your inbox.