1. Integration Overview

Implementing QuickNode Streams and Functions has revolutionized how Chatter Wallet handles blockchain transactions, particularly for USDC and SOL tokens. This technical documentation outlines our integration process, architecture decisions, and implementation details.

2. Technical Architecture

The naive approach would be to implement a custom watcher for whenever there’s an external transaction on any wallets maintained by the system but there are a lot of issues going this route. Below is a Technical Architecture Diagram showing how we utilize the low-latency, performant and highly scalable systems developed at QuickNode to achieve a much better result by only dealing with data we absolutely need.

graph TD;
    A["Blockchain Transaction"] --> B["QuickNode Streams"];
    B --> C["Transaction Filters"];
    C --> D["Webhook API Server"];
    D --> E["User Account Updates"];
    D --> F["Transaction Processing"];

3. QuickNode Streams Implementation

Our QuickNode streams implementation focuses on efficient transaction monitoring with minimal overhead.

Stream Configuration

const streamConfig = {
  network: 'solana-mainnet',
  filters: [
    {
      account: 'USDC_TOKEN_ADDRESS',
      type: 'token_transfer',
      direction: 'both'
    },
    {
      account: 'SOL_WALLET_ADDRESS',
      type: 'native_transfer'
    }
  ]
};

Transaction Filtering Logic


// Import dependencies, learn more at: <https://www.quicknode.com/docs/functions/runtimes/node-js-20-runtime>

/**
 * main(params) is invoked when your Function is called from Streams or API.
 * 
 * @param {Object} params - May contain a dataset (params.data), params.metadata, and params.user_data
 * 
 * @returns {Object} - A message that will be logged and returned when a Function is called via API.
 *           Tip: You can chain Functions together by calling them inside each other.
 * Learn more at: <https://www.quicknode.com/docs/functions/getting-started#overview>
 */
const VOTE_PROGRAM_ID = "Vote111111111111111111111111111111111111111";

const isVoteTransaction = (tx) => {
    return tx.transaction?.message?.instructions?.some(instruction =>
        instruction.program === "vote" || instruction.programId === VOTE_PROGRAM_ID
    );
};

function main(stream) {
    try {
        const incomingData = stream.data ? stream.data : stream;
        const blocks = Array.isArray(incomingData) ? incomingData : [incomingData];

        const storedAddresses = ["CHTRJ5pnyBsbmPjPViMhG9CzGZtne4ivifKdPwm3mR3J"];
        const addressSet = new Set(storedAddresses);

        const matchingTransactions = [];

        blocks.forEach(block => {
            if (!block.transactions || !Array.isArray(block.transactions)) {
                return;
            }

            block.transactions.forEach(transaction => {
                if (!transaction?.transaction?.message?.accountKeys) return;
                if (isVoteTransaction(transaction)) return;

                const accountKeys = transaction.transaction.message.accountKeys;

                const hasMatchingAddress = accountKeys.some(account => 
                    addressSet.has(account.pubkey)
                );

                const isSuccessful = transaction.meta.status?.Ok === null;

                if (hasMatchingAddress && isSuccessful) {
                    const transactionInfo = {
                        signature: transaction.signatures?.[0],
                        blockhash: transaction.blockhash,
                        accounts: accountKeys.map(account => ({
                            address: account.pubkey,
                            isSigner: account.signer,
                            isWritable: account.writable
                        })),
                        status: transaction.meta?.status?.Ok !== undefined ? 'confirmed' : 'failed',
                        fee: transaction.meta?.fee,
                        computeUnitsConsumed: transaction.meta?.computeUnitsConsumed,
                        preBalances: transaction.meta?.preBalances,
                        postBalances: transaction.meta?.postBalances,
                        logs: transaction.meta?.logMessages
                    };

                    matchingTransactions.push(transactionInfo);
                }
            });
        });

        return matchingTransactions.length == 0 ? [] : matchingTransactions;
    } catch (e) {
        console.error('Error processing transaction:', e);
        return { error: e.message };
    }
}

4. QuickNode Functions Integration

We utilized QuickNode Functions for the delivery of processed transaction data to our server via webhooks.

Emit webhook from Function


// Import dependencies, learn more at: <https://www.quicknode.com/docs/functions/runtimes/node-js-20-runtime>
const https = require('https');
const http = require('http');
const { URL } = require('url');

/**
 * main(params) is invoked when your Function is called from Streams or API.
 * 
 * @param {Object} params - May contain a dataset (params.data), params.metadata, and params.user_data
 * 
 * @returns {Object} - A message that will be logged and returned when a Function is called via API.
 *           Tip: You can chain Functions together by calling them inside each other.
 * Learn more at: <https://www.quicknode.com/docs/functions/getting-started#overview>
 */

async function main(filteredStream) {
    try {
        if (filteredStream.length > 0) {
            const payload = JSON.stringify({
                timestamp: new Date().toISOString(),
                transactionCount: filteredStream.length,
                transactions: filteredStream
            });

            const webhookUrl = new URL(`http://${AWS_PUBLIC_IP}:${PORT}/api/webhook/quicknode/streams`);
            
            // Choose http or https based on protocol
            const protocol = webhookUrl.protocol === 'https:' ? https : http;

            const options = {
                hostname: webhookUrl.hostname,
                port: webhookUrl.port || (webhookUrl.protocol === 'https:' ? 443 : 80),
                path: webhookUrl.pathname,
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                    'Content-Length': Buffer.byteLength(payload)
                },
                // For https, we might want to allow self-signed certificates in dev
                rejectUnauthorized: false
            };

            await new Promise((resolve, reject) => {
                const req = protocol.request(options, res => {
                    let data = '';
                    res.on('data', chunk => {
                        data += chunk;
                    });
                    res.on('end', () => {
                        if (res.statusCode >= 200 && res.statusCode < 300) {
                            console.log('Webhook response:', data);
                            resolve(data);
                        } else {
                            reject(new Error(`HTTP Error: ${res.statusCode} ${data}`));
                        }
                    });
                });

                req.on('error', error => {
                    console.error('Error sending to webhook:', error);
                    reject(error);
                });

                req.setTimeout(5000, () => {
                    req.destroy();
                    reject(new Error('Request timeout'));
                });

                req.write(payload);
                req.end();
            });
        }

        return filteredStream.length === 0 ? null : {
            transactions: filteredStream,
        };
    } catch (e) {
        console.error('Error processing transaction:', e)
        return { error: e.message }
    }
}

Webhook Handler

  async handleQuicknodeStreamsWebhook(data) {
    try {
      const solPrice = await getSOLRate();
      const transactions = data.transactions;

      for (const tx of transactions) {
        const writableAccounts = tx.accounts.filter(
          (account) => !account.isSigner && account.isWritable,
        );

        for (const account of writableAccounts) {
          const wallet = await this.walletModel.findOne({
            address: account.address,
            network: 'SOL',
          });

          if (wallet) {
            await this.walletReconciliationService.reconcileWalletBalance(
              wallet,
              solPrice,
            );
          }
        }
      }
    } catch (error) {
      this.logger.error('Error processing QuickNode webhook:', error);
    }
  }

5. Keeping the filter function updated

Keeping the filter function was a headache, for the reasons below: