Streaming Updates
Streaming Updates
Overview
RTK Query gives you the ability to receive streaming updates for persistent queries. This enables a query to establish an ongoing connection to the server (typically using WebSockets), and apply updates to the cached data as additional information is received from the server.
Streaming updates can be used to enable the API to receive real-time updates to the back-end data, such as new entries being created, or important properties being updated.
To enable streaming updates for a query, pass the asynchronous onCacheEntryAdded
function to the query, including the logic for how to update the query when streamed data is received. See onCacheEntryAdded
API reference for more details.
When to use streaming updates
Primarily updates to query data should be done via polling
intermittently on an interval, using cache invalidation
to invalidate data based on tags associated with queries & mutations, or with refetchOnMountOrArgChange
to fetch fresh data when a component using the data mounts.
However, streaming updates is particularly useful for scenarios involving:
Small, frequent changes to large objects. Rather than repeatedly polling for a large object repeatedly, the object can be fetched with an initial query, and streaming updates can update individual properties as updates are received.
External event-driven updates. Where data may be changed by the server or otherwise external users and where real-time updates are expected to be shown to an active user, polling alone would result in periods of stale data in between queries, causing state to easily get out of sync. Streaming updates can update all active clients as the updates occur rather than waiting for the next interval to elapse.
Example use cases that benefit from streaming updates are:
GraphQL subscriptions
Real-time chat applications
Real-time multiplayer games
Collaborative document editing with multiple concurrent users
Using the onCacheEntryAdded
Lifecycle
onCacheEntryAdded
LifecycleThe onCacheEntryAdded
lifecycle callback lets you write arbitrary async logic that will be executed after a new cache entry is added to the RTK Query cache (ie, after a component has created a new subscription to a given endpoint+params combination).
onCacheEntryAdded
will be called with two arguments: the arg
that was passed to the subscription, and an options object containing "lifecycle promises" and utility functions. You can use these to write sequenced logic that waits for data to be added, initiates server connections, applies partial updates, and cleans up the connection when the query subscription is removed.
Typically, you will await cacheDataLoaded
to determine when the first data has been fetched, then use the updateCacheData
utility to apply streaming updates as messages are received. updateCacheData
is an Immer-powered callback that receives a draft
of the current cache value. You may "mutate" the draft value to update it as needed based on the received values. RTK Query will then dispatch an action that applies a diffed patch based on those changes.
Finally, you can await cacheEntryRemoved
to know when to clean up any server connections.
Streaming Update Examples
Websocket Chat API
What to expect
When the getMessages
query is triggered (e.g. via a component mounting with the useGetMessagesQuery()
hook), a cache entry
will be added based on the serialized arguments for the endpoint. The associated query will be fired off based on the query
property to fetch the initial data for the cache. Meanwhile, the asynchronous onCacheEntryAdded
callback will begin, and create a new WebSocket connection. Once the response for the initial query is received, the cache will be populated with the response data, and the cacheDataLoaded
promise will resolve. After awaiting the cacheDataLoaded
promise, the message
event listener will be added to the WebSocket connection, which updates the cache data when an associated message is received.
When there are no more active subscriptions to the data (e.g. when the subscribed components remain unmounted for a sufficient amount of time), the cacheEntryRemoved
promise will resolve, allowing the remaining code to run and close the websocket connection. RTK Query will also remove the associated data from the cache.
If a query for the corresponding cache entry runs later, it will overwrite the whole cache entry, and the streaming update listeners will continue to work on the updated data.
Websocket Chat API with a transformed response shape
This example demonstrates how the previous example can be altered to allow for transforming the response shape when adding data to the cache.
For example, the data is transformed from this shape:
To this:
A key point to keep in mind is that updates to the cached data within the onCacheEntryAdded
callback must respect the transformed data shape which will be present for the cached data. The example shows how createEntityAdapter
can be used for the initial transformResponse
, and again when streamed updates are received to upsert received items into the cached data, while maintaining the normalized state structure.
Last updated