How to add new keys to 50 Million+ records using MongoDB Streams

How to add new keys to 50 Million+ records using MongoDB Streams


Introduction

MongoDB is a popular NoSQL database that is widely used for building scalable applications. As your application grows and evolves, you may need to make changes to your MongoDB collections, such as adding a new field to your documents. Migrating large MongoDB collections can be a challenging task, especially when dealing with millions of records.

In this article, we’ll explore how to use Node.js and MongoDB streams to efficiently migrate large MongoDB collections. We’ll demonstrate how to add two new keys to a MongoDB collection with 10 million+ records using streams and compare this approach to using cursors.

Adding new keys to a MongoDB Collection

Let’s start with an example scenario where we need to add a new key to a MongoDB collection with 10 million+ records. We can accomplish this task using Node.js and MongoDB updateOne() method, which allows us to update a single document in a collection. Here's an example Node.js script that adds new keys to a MongoDB collection using a cursor:

Sample record:

{
  "name": "vivek bharatha",
  "address1": "horizon",
  "address2": "cityName, StateName"
}
const { MongoClient } = require("mongodb");
const logger = require("log4js").getLogger();
logger.level = 'debug';

const uri = "mongodb://localhost:27017";
const dbName = "v-bank";
const collectionName = "users";

(async () => {
  const client = new MongoClient(uri);
  await client.connect();

  const db = client.db(dbName);
  const collection = db.collection(collectionName);
  const query = { state: { $exists: false } };
  const count = await collection.countDocuments(query);
  logger.info(`Total records to process: ${count}`);
  const cursor = collection.find(query);
  let processed = 0;

  try {
    while (await cursor.hasNext()) {
      const doc = await cursor.next();
      const [city, state] = doc.address2.split(", ");
      const result = await collection.updateOne(
        { _id: doc._id },
        { $set: { state, city } }
      );
      if (result.modifiedCount) {
        processed += result.modifiedCount;
      }

      logger.info(`Percentage completed: ${((processed / count) * 100).toFixed(3)} %`);
    }

    logger.info("Migration completed successfully");
  } catch (err) {
    logger.error(err);
  } finally {
    await client.close();
  }
})();

This script iterates over the documents in the collection using a cursor and adds a new key to each document using MongoDB’s updateOne() method. While this approach works for smaller collections, it can be slow and overloaded on the database instance side as we send the request to update for every document and affect the whole application performance.


Using MongoDB Streams to Migrate Large Collections

MongoDB streams are a powerful feature of Node.js that allows you to process large collections efficiently. Using streams can be a more memory-efficient and performant approach than using cursors, especially when dealing with large collections.

To use streams to migrate a MongoDB collection, we’ll create a read stream to read the documents from the collection, and a write stream to add the new keys to each document. We’ll then pipe the read stream to the write stream, which will process the documents as they arrive.

Here’s an example Node.js script that uses MongoDB streams to add a new key to a MongoDB collection with 10 million+ records:

const { MongoClient } = require("mongodb");
const { Transform } = require("stream");
const logger = require("log4js").getLogger();
logger.level = 'debug';

const uri = "mongodb://localhost:27017";
const dbName = "v-bank";
const collectionName = "users";
const batchSize = 100_000;

(async () => {
  const client = new MongoClient(uri);
  await client.connect();

  const db = client.db(dbName);
  const collection = db.collection(collectionName);
  const query = { state: { $exists: false } };
  const totalRecordsToProcess = await collection.countDocuments(query);
  logger.info(`Total records to process: ${totalRecordsToProcess}`);

  let count = 0;
  let processed = 0;

  const docTransformStream = new Transform({
    objectMode: true,
    transform: async function (doc, encoding, callback) {
      try {
        const [city, state] = doc.address2.split(", ");
        doc.city = city;
        doc.state = state;
        this.push(doc);
        callback();
      } catch (err) {
        logger.info(err + `for doc id: ${doc._id}`);
        callback();
      }
    },
  });

  const readStream = collection.find(query).stream();
  let updateStream = collection.initializeUnorderedBulkOp();

  readStream
    .pipe(docTransformStream)
    .on("data", async (doc) => {
      updateStream
        .find({ _id: doc._id })
        .updateOne({ $set: { city: doc.city, state: doc.state } });
      count++;

      if (count % batchSize === 0) {
        readStream.pause();

        const result = await updateStream.execute();
        processed += result.nModified;

        logger.info(
          `Percentage completed: ${(
            (processed / totalRecordsToProcess) *
            100
          ).toFixed(3)} %`
        );

        updateStream = collection.initializeUnorderedBulkOp();
        readStream.resume();
      }
    })
    .on("error", logger.error)
    .on("end", async () => {
      if (count % batchSize !== 0) {
        const result = await updateStream.execute();
        processed += result.nModified;
        logger.info(`Updated ${result.nModified} records`);

        logger.info(`Total records modified: ${processed}`);
        client.close();
      }
    });
})();

In this script, we convert the find query to stream as readStream. The docTransformStream transform function adds the new keys to each doc and we hook on data, error and end listeners.

We use bulkOp API to update docs in the bulk of batchSize. Initialize BulkOp stream as updateStream. Once it reaches a batchSize we execute the bulkOp and re-initialize with a new bulkOp object.

As we have async ops in the data handler function, data handler function won’t wait to process the async op to finish it just keeps receiving the data, to handle it correctly we pause and resume the stream once we hit batch size to execute bulk Op.

Finally, on the end listener, we check if any last chunk of records are there and process the same.

Conclusion

Migrating MongoDB collections can be a complex and challenging task, especially when dealing with large collections. In this article, we explored how to use Node.js and MongoDB streams to efficiently migrate large MongoDB collections. We demonstrated how to add a new key to a MongoDB collection with 10 million+ records using streams, and compared this approach to using cursors.

By using streams and bulkWrite(), we were able to efficiently process the documents in the collection without loading all the data into memory at once, and sending updates in batches resulting in faster and more memory-efficient migrations.

NOTE:
There are some assumptions here like
- the new keys are not being used in the application level during migration
- every address2 field will always have the city and state name with a pattern “city, state”
- You might wonder if batchSize defined in the above script is 100K which is not an ideal batch size but based on the testing on lower environments like Staging, UAT you could narrow it down to a sweet point considering the large dataset of 50M+ records.

👋 If you found this information helpful, please consider showing your appreciation and sharing it with others who might benefit from it as well. Your support motivates me to continue creating useful content. Thank you!