Streaming Updates

Streaming Updates

Overview

RTK Query gives you the ability to receive streaming updates for persistent queries. This enables a query to establish an ongoing connection to the server (typically using WebSockets), and apply updates to the cached data as additional information is received from the server.

Streaming updates can be used to enable the API to receive real-time updates to the back-end data, such as new entries being created, or important properties being updated.

To enable streaming updates for a query, pass the asynchronous onCacheEntryAdded function to the query, including the logic for how to update the query when streamed data is received. See onCacheEntryAdded API reference for more details.

When to use streaming updates

Primarily updates to query data should be done via polling intermittently on an interval, using cache invalidation to invalidate data based on tags associated with queries & mutations, or with refetchOnMountOrArgChange to fetch fresh data when a component using the data mounts.

However, streaming updates is particularly useful for scenarios involving:

  • Small, frequent changes to large objects. Rather than repeatedly polling for a large object repeatedly, the object can be fetched with an initial query, and streaming updates can update individual properties as updates are received.

  • External event-driven updates. Where data may be changed by the server or otherwise external users and where real-time updates are expected to be shown to an active user, polling alone would result in periods of stale data in between queries, causing state to easily get out of sync. Streaming updates can update all active clients as the updates occur rather than waiting for the next interval to elapse.

Example use cases that benefit from streaming updates are:

  • GraphQL subscriptions

  • Real-time chat applications

  • Real-time multiplayer games

  • Collaborative document editing with multiple concurrent users

Using the onCacheEntryAdded Lifecycle

The onCacheEntryAdded lifecycle callback lets you write arbitrary async logic that will be executed after a new cache entry is added to the RTK Query cache (ie, after a component has created a new subscription to a given endpoint+params combination).

onCacheEntryAdded will be called with two arguments: the arg that was passed to the subscription, and an options object containing "lifecycle promises" and utility functions. You can use these to write sequenced logic that waits for data to be added, initiates server connections, applies partial updates, and cleans up the connection when the query subscription is removed.

Typically, you will await cacheDataLoaded to determine when the first data has been fetched, then use the updateCacheData utility to apply streaming updates as messages are received. updateCacheData is an Immer-powered callback that receives a draft of the current cache value. You may "mutate" the draft value to update it as needed based on the received values. RTK Query will then dispatch an action that applies a diffed patch based on those changes.

Finally, you can await cacheEntryRemoved to know when to clean up any server connections.

Streaming Update Examples

Websocket Chat API

// file: schemaValidators.ts noEmit
import type { Message } from './api'

export function isMessage(message: unknown): message is Message {
  // in real code this would check `message` to ensure it is a `Message`
  return true
}

// file: api.ts
import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react'
import { isMessage } from './schemaValidators'

export type Channel = 'redux' | 'general'

export interface Message {
  id: number
  channel: Channel
  userName: string
  text: string
}

export const api = createApi({
  baseQuery: fetchBaseQuery({ baseUrl: '/' }),
  endpoints: (build) => ({
    getMessages: build.query<Message[], Channel>({
      query: (channel) => `messages/${channel}`,
      // highlight-start
      async onCacheEntryAdded(
        arg,
        { updateCachedData, cacheDataLoaded, cacheEntryRemoved }
      ) {
        // create a websocket connection when the cache subscription starts
        const ws = new WebSocket('ws://localhost:8080')
        try {
          // wait for the initial query to resolve before proceeding
          await cacheDataLoaded

          // when data is received from the socket connection to the server,
          // if it is a message and for the appropriate channel,
          // update our query result with the received message
          const listener = (event: MessageEvent) => {
            const data = JSON.parse(event.data)
            if (!isMessage(data) || data.channel !== arg) return

            updateCachedData((draft) => {
              draft.push(data)
            })
          }

          ws.addEventListener('message', listener)
        } catch {
          // no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
          // in which case `cacheDataLoaded` will throw
        }
        // cacheEntryRemoved will resolve when the cache subscription is no longer active
        await cacheEntryRemoved
        // perform cleanup steps once the `cacheEntryRemoved` promise resolves
        ws.close()
      },
      // highlight-end
    }),
  }),
})

export const { useGetMessagesQuery } = api

What to expect

When the getMessages query is triggered (e.g. via a component mounting with the useGetMessagesQuery() hook), a cache entry will be added based on the serialized arguments for the endpoint. The associated query will be fired off based on the query property to fetch the initial data for the cache. Meanwhile, the asynchronous onCacheEntryAdded callback will begin, and create a new WebSocket connection. Once the response for the initial query is received, the cache will be populated with the response data, and the cacheDataLoaded promise will resolve. After awaiting the cacheDataLoaded promise, the message event listener will be added to the WebSocket connection, which updates the cache data when an associated message is received.

When there are no more active subscriptions to the data (e.g. when the subscribed components remain unmounted for a sufficient amount of time), the cacheEntryRemoved promise will resolve, allowing the remaining code to run and close the websocket connection. RTK Query will also remove the associated data from the cache.

If a query for the corresponding cache entry runs later, it will overwrite the whole cache entry, and the streaming update listeners will continue to work on the updated data.

Websocket Chat API with a transformed response shape

// file: schemaValidators.ts noEmit
import type { Message } from './api'

export function isMessage(message: unknown): message is Message {
  // in real code this would check `message` to ensure it is a `Message`

  return true
}

// file: api.ts
import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react'
import { createEntityAdapter, EntityState } from '@reduxjs/toolkit'
import { isMessage } from './schemaValidators'

export type Channel = 'redux' | 'general'

export interface Message {
  id: number
  channel: Channel
  userName: string
  text: string
}

// highlight-start
const messagesAdapter = createEntityAdapter<Message>()
// highlight-end
export const api = createApi({
  baseQuery: fetchBaseQuery({ baseUrl: '/' }),
  endpoints: (build) => ({
    // highlight-start
    getMessages: build.query<EntityState<Message>, Channel>({
      // highlight-end
      query: (channel) => `messages/${channel}`,
      // highlight-start
      transformResponse(response: Message[]) {
        return messagesAdapter.addMany(
          messagesAdapter.getInitialState(),
          response
        )
      },
      // highlight-end
      async onCacheEntryAdded(
        arg,
        { updateCachedData, cacheDataLoaded, cacheEntryRemoved }
      ) {
        const ws = new WebSocket('ws://localhost:8080')
        try {
          await cacheDataLoaded

          const listener = (event: MessageEvent) => {
            const data = JSON.parse(event.data)
            if (!isMessage(data) || data.channel !== arg) return

            updateCachedData((draft) => {
              // highlight-start
              messagesAdapter.upsertOne(draft, data)
              // highlight-end
            })
          }

          ws.addEventListener('message', listener)
        } catch {}
        await cacheEntryRemoved
        ws.close()
      },
    }),
  }),
})

export const { useGetMessagesQuery } = api

This example demonstrates how the previous example can be altered to allow for transforming the response shape when adding data to the cache.

For example, the data is transformed from this shape:

[
  {
    id: 0
    channel: 'redux'
    userName: 'Mark'
    text: 'Welcome to #redux!'
  },
  {
    id: 1
    channel: 'redux'
    userName: 'Lenz'
    text: 'Glad to be here!'
  },
]

To this:

{
  // The unique IDs of each item. Must be strings or numbers
  ids: [0, 1],
  // A lookup table mapping entity IDs to the corresponding entity objects
  entities: {
    0: {
      id: 0,
      channel: "redux",
      userName: "Mark",
      text: "Welcome to #redux!",
    },
    1: {
      id: 1,
      channel: "redux",
      userName: "Lenz",
      text: "Glad to be here!",
    },
  },
};

A key point to keep in mind is that updates to the cached data within the onCacheEntryAdded callback must respect the transformed data shape which will be present for the cached data. The example shows how createEntityAdapter can be used for the initial transformResponse, and again when streamed updates are received to upsert received items into the cached data, while maintaining the normalized state structure.

Last updated