Implementing QuickNode Streams and Functions has revolutionized how Chatter Wallet handles blockchain transactions, particularly for USDC and SOL tokens. This technical documentation outlines our integration process, architecture decisions, and implementation details.
The naive approach would be to implement a custom watcher for whenever there’s an external transaction on any wallets maintained by the system but there are a lot of issues going this route. Below is a Technical Architecture Diagram showing how we utilize the low-latency, performant and highly scalable systems developed at QuickNode to achieve a much better result by only dealing with data we absolutely need.
graph TD;
A["Blockchain Transaction"] --> B["QuickNode Streams"];
B --> C["Transaction Filters"];
C --> D["Webhook API Server"];
D --> E["User Account Updates"];
D --> F["Transaction Processing"];
Our QuickNode streams implementation focuses on efficient transaction monitoring with minimal overhead.
const streamConfig = {
network: 'solana-mainnet',
filters: [
{
account: 'USDC_TOKEN_ADDRESS',
type: 'token_transfer',
direction: 'both'
},
{
account: 'SOL_WALLET_ADDRESS',
type: 'native_transfer'
}
]
};
// Import dependencies, learn more at: <https://www.quicknode.com/docs/functions/runtimes/node-js-20-runtime>
/**
* main(params) is invoked when your Function is called from Streams or API.
*
* @param {Object} params - May contain a dataset (params.data), params.metadata, and params.user_data
*
* @returns {Object} - A message that will be logged and returned when a Function is called via API.
* Tip: You can chain Functions together by calling them inside each other.
* Learn more at: <https://www.quicknode.com/docs/functions/getting-started#overview>
*/
const VOTE_PROGRAM_ID = "Vote111111111111111111111111111111111111111";
const isVoteTransaction = (tx) => {
return tx.transaction?.message?.instructions?.some(instruction =>
instruction.program === "vote" || instruction.programId === VOTE_PROGRAM_ID
);
};
function main(stream) {
try {
const incomingData = stream.data ? stream.data : stream;
const blocks = Array.isArray(incomingData) ? incomingData : [incomingData];
const storedAddresses = ["CHTRJ5pnyBsbmPjPViMhG9CzGZtne4ivifKdPwm3mR3J"];
const addressSet = new Set(storedAddresses);
const matchingTransactions = [];
blocks.forEach(block => {
if (!block.transactions || !Array.isArray(block.transactions)) {
return;
}
block.transactions.forEach(transaction => {
if (!transaction?.transaction?.message?.accountKeys) return;
if (isVoteTransaction(transaction)) return;
const accountKeys = transaction.transaction.message.accountKeys;
const hasMatchingAddress = accountKeys.some(account =>
addressSet.has(account.pubkey)
);
const isSuccessful = transaction.meta.status?.Ok === null;
if (hasMatchingAddress && isSuccessful) {
const transactionInfo = {
signature: transaction.signatures?.[0],
blockhash: transaction.blockhash,
accounts: accountKeys.map(account => ({
address: account.pubkey,
isSigner: account.signer,
isWritable: account.writable
})),
status: transaction.meta?.status?.Ok !== undefined ? 'confirmed' : 'failed',
fee: transaction.meta?.fee,
computeUnitsConsumed: transaction.meta?.computeUnitsConsumed,
preBalances: transaction.meta?.preBalances,
postBalances: transaction.meta?.postBalances,
logs: transaction.meta?.logMessages
};
matchingTransactions.push(transactionInfo);
}
});
});
return matchingTransactions.length == 0 ? [] : matchingTransactions;
} catch (e) {
console.error('Error processing transaction:', e);
return { error: e.message };
}
}
We utilized QuickNode Functions for the delivery of processed transaction data to our server via webhooks.
// Import dependencies, learn more at: <https://www.quicknode.com/docs/functions/runtimes/node-js-20-runtime>
const https = require('https');
const http = require('http');
const { URL } = require('url');
/**
* main(params) is invoked when your Function is called from Streams or API.
*
* @param {Object} params - May contain a dataset (params.data), params.metadata, and params.user_data
*
* @returns {Object} - A message that will be logged and returned when a Function is called via API.
* Tip: You can chain Functions together by calling them inside each other.
* Learn more at: <https://www.quicknode.com/docs/functions/getting-started#overview>
*/
async function main(filteredStream) {
try {
if (filteredStream.length > 0) {
const payload = JSON.stringify({
timestamp: new Date().toISOString(),
transactionCount: filteredStream.length,
transactions: filteredStream
});
const webhookUrl = new URL(`http://${AWS_PUBLIC_IP}:${PORT}/api/webhook/quicknode/streams`);
// Choose http or https based on protocol
const protocol = webhookUrl.protocol === 'https:' ? https : http;
const options = {
hostname: webhookUrl.hostname,
port: webhookUrl.port || (webhookUrl.protocol === 'https:' ? 443 : 80),
path: webhookUrl.pathname,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(payload)
},
// For https, we might want to allow self-signed certificates in dev
rejectUnauthorized: false
};
await new Promise((resolve, reject) => {
const req = protocol.request(options, res => {
let data = '';
res.on('data', chunk => {
data += chunk;
});
res.on('end', () => {
if (res.statusCode >= 200 && res.statusCode < 300) {
console.log('Webhook response:', data);
resolve(data);
} else {
reject(new Error(`HTTP Error: ${res.statusCode} ${data}`));
}
});
});
req.on('error', error => {
console.error('Error sending to webhook:', error);
reject(error);
});
req.setTimeout(5000, () => {
req.destroy();
reject(new Error('Request timeout'));
});
req.write(payload);
req.end();
});
}
return filteredStream.length === 0 ? null : {
transactions: filteredStream,
};
} catch (e) {
console.error('Error processing transaction:', e)
return { error: e.message }
}
}
async handleQuicknodeStreamsWebhook(data) {
try {
const solPrice = await getSOLRate();
const transactions = data.transactions;
for (const tx of transactions) {
const writableAccounts = tx.accounts.filter(
(account) => !account.isSigner && account.isWritable,
);
for (const account of writableAccounts) {
const wallet = await this.walletModel.findOne({
address: account.address,
network: 'SOL',
});
if (wallet) {
await this.walletReconciliationService.reconcileWalletBalance(
wallet,
solPrice,
);
}
}
}
} catch (error) {
this.logger.error('Error processing QuickNode webhook:', error);
}
}
Keeping the filter function was a headache, for the reasons below: