Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to update returned hook data from RTK Query when a websocket message is received

Please can someone explain how to wire up the receiving of data in a websocket message to the data object of the hook from an RTK Query api endpoint?

We don't need to store the message that is received, we just want to pass it on in the data argument of the useGetWebsocketResponseQuery hook so we can trigger a notification in the UI.

reducerPath: 'someApi',
    baseQuery: baseQueryWithReauth,
    endpoints: (builder) => ({
        getWebsocketResponse: builder.query<WebsocketResult, void>({
            queryFn: () => ({data: {}),
            async onCacheEntryAdded(arg, { updateCachedData, cacheDataLoaded, cacheEntryRemoved }) {        
                try {                  
                    // wait for the initial query to resolve before proceeding
                    await cacheDataLoaded;

                    const socket = io('http://url', {});                
                    socket.on('connect', () => {
                        console.log('socket connected on rtk query');
                    });

                    socket.on('message', (message) => {
                        console.log(`received message: ${message}`);
                        // THIS IS WHERE THE DATA NEEDS TO BE WIRED UP TO THE HOOK BUT HOW?
                    });

                    await cacheEntryRemoved;        
                } catch {
                    // no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
                    // in which case `cacheDataLoaded` will throw
                }                 
            }
        }),
// Export hooks for usage in functional components, which are
// auto-generated based on the defined endpoints
export const {
    useGetWebsocketResponseQuery
} = someApi;

It feels like this should be possible in the socket.on('message', {}) handler but how? The updateCachedData method seems like it should be the way to go but I am not sure how to implement it.

All help gratefully received :-)

Much thanks,

Sam


UPDATE with solution from @phry

The issue was that the data defined in the queryFn needed to match the shape of what was appended from the cacheDataLoaded i.e. it should be like this:-

queryFn: () => ({data: { messages: [] }),

and

socket.on('connect', () => {                        
   updateCachedData((currentCacheData) => {
      currentCacheData.messages.push(message);
   });
});
like image 924
SamBrick Avatar asked Dec 12 '25 04:12

SamBrick


1 Answers

after reading the redux toolkit Docs I found so far that you should use createEntityAdapter to create a DB Schema for cached data and after you receive a message from the socket you update that DB (the cache) with CRUD functions and for example first the API query fires and fetch the data you may use addOne or addMany of createEntityAdapter and then when data received from the socket you may use setOne for setMany to update the cache data inside of updateCachedData. by doing so the actual data in use[yourWhatEver]Query gets updated first when query fetch API and then each time data received from the socket.

import { createApi } from '@reduxjs/toolkit/query/react';
import axiosBaseQuery from 'api/axiosBaseQuery';
import { createEntityAdapter } from '@reduxjs/toolkit';

const instrumentsAdapter = createEntityAdapter({
    selectId: (item) => item?.state?.symbol
});

export const marketApi = createApi({
    reducerPath: 'api/market',
    baseQuery: axiosBaseQuery(),
    endpoints: (builder) => ({
        getInstrumentByRefId: builder.query({
            query: (refId) => ({
                url: `/market/instruments/${refId}/summary`,
                method: 'get'
            }),
            transformResponse: (res) => {
                return instrumentsAdapter.addMany(instrumentsAdapter.getInitialState(), [res]);
            },
            async onCacheEntryAdded(arg, { updateCachedData, cacheDataLoaded, cacheEntryRemoved }) {
                // arg === refId
                const payload = `instruments.${arg}.summary`;

                // create a websocket connection when the cache subscription starts
                const ws = new WebSocket('wss://[domain.com]/api/notification/ws');
                let waitTimer = null;
                const waitForConnection = (callback, interval) => {
                    clearTimeout(waitTimer);

                    if (ws && ws.readyState === 1) {
                        callback();
                        return;
                    }

                    // optional: implement backoff for interval here
                    waitTimer = setTimeout(() => {
                        waitForConnection(callback, interval);
                    }, interval);
                };
                try {
                    // wait for the initial query to resolve before proceeding
                    await cacheDataLoaded;

                    // when data is received from the socket connection to the server,
                    // if it is a message and for the appropriate channel,
                    // update our query result with the received message
                    const listener = (event) => {
                        const data = JSON.parse(event.data);
                        // eslint-disable-next-line no-console
                        // console.log('data', data);
                        // if (!isMessage(data) || data.channel !== arg) return;
                        updateCachedData((draft) => {
                            // eslint-disable-next-line no-unused-vars, no-param-reassign
                            if (data.value) {
                                instrumentsAdapter.setMany(draft, [data.value]);
                            }
                        });
                    };

                    waitForConnection(() => {
                        ws.send(
                            JSON.stringify({
                                id: '1',
                                type: 'SUBSCRIBE',
                                path: payload
                            })
                        );
                    }, 100);

                    ws.onmessage = listener;
                    // ws.addEventListener('message', listener);
                } catch (err) {
                    console.log('err', err);
                    // no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
                    // in which case `cacheDataLoaded` will throw
                }
                // cacheEntryRemoved will resolve when the cache subscription is no longer active
                await cacheEntryRemoved;
                // perform cleanup steps once the `cacheEntryRemoved` promise resolves
                ws.close();
            }
        }),
        getCandles: builder.query({
            query: ({ refId, bucket, end, limit = 1 }) => ({
                url: `/market/instruments/${refId}/candles?bucket=${bucket}&end=${end}&limit=${limit}`,
                method: 'get'
            })
        })
    })
});

export const {
    useGetMarketMapQuery,
    useGetInstrumentByRefIdQuery,
    useGetInstrumentsQuery,
    useGetCandlesQuery
} = marketApi;

and in the React Component you can have updated value

function MyReactComponent() {
    // data will gets updated every time socket data received.
    const { data } = useGetInstrumentByRefIdQuery('IRO1SIPA0001');

    return JSON.stringify(data);
}

I hope this will be helpful.

like image 154
Amir Rezvani Avatar answered Dec 13 '25 19:12

Amir Rezvani