How I Screwed Up NestJs Request Scoped DI and Db Transactions with a Factory

Everyone loves a highlight reel, but today I find myself humbled. Today you get a blooper, even though Gemini told me I was doing a fantastic job with "...incredibly sharp and important question(s)".

As I've told my co-workers and plenty of my friends, pain is a great teacher. This is a lesson that cut deep leaving an unforgettable scar.

Before You Read

  1. Make sure you understand NestJs Dependency Injection Scopes.
  2. RxJs operators forkJoin and concat

Problems

Social Sprinkler publishes to multiple social media channels. Each post to be published is stored to a table with reference to which channel to publish. Every channel has a publisher service that obeys a specific service contact. Then a factory chooses the publisher service to use based on post’s social channel field.

This worked great until unexpected behavior surfaced.

  1. My Node Publisher Service thread was blocked as I used RxJs streams that never emitted a value with the forkJoin operator.
  2. Mutated data creates a transaction and commits on success and rolls back on errors through an interceptor. One table was updating correctly, while another table with child records was not updating.

Database Connection Module

While these classes aren't necessarily the root cause of the issue, I'm going to present them so you understand some of the choices I made for data access with a MySQL database.

A @Global module contains all the necessary services to connect to my database. This includes a singleton database connection pool service, database connection manager, connection context service, and an interceptor as a Unit of Work pattern for database changes.

Database Connection Pool

The SqlDbPoolService is a singleton that creates a connection pool to reuse connections. It’s responsible for creating the pool and releasing the pool on application or module destroy.

Database Connection Manager

The DbConnectionFactory uses the SqlDbPoolService to create and release connections as a request scoped service.

SqlContext Service

The SqlContextService uses a connection created in the DbConnectionManager. As a request scoped service, it uses the connection created by the DbConnctionManager. It starts, commits, and rolls back transactions. Finally, it can release connections.

Transaction Interceptor

The TransactionInterceptor allows the controller route handler to execute. Using the SqlContextService, saveChanges executes in the stream when no errors are thrown. When errors are thrown, the transaction is rolled back. On completion of the stream, the finalize operator is executed and the connection is released.

It's provided as an APP_INTERCEPTOR so it executes for every request made.

Problem 1 - Blocked Thread

The publisher service kept timing out at five minutes of execution. Looking through my logs didn't provide me many hints. Adding log messages throughout lead me through wear the request hung.

Based on the log messages I noticed that the request hung in one of my publisher services. The publisher service dynamically instantiates through the use of a factory based on the type of content to be posted to different social media. Adding another log message within the publisher service instantiated, it never emitted a value where I thought multiple parallel calls could be made. Here, I used the forkJoin operator to make multiple calls to my database.

Rather than running database calls in parallel, I decided to run them sequentially in the order they were called. This would help with row locking issues in the database in per the transaction. Then I'd collect all the returned values. To do this, I used a combination of concat operator and toArray. The concat operator will execute in order and wait to execute the next observable until the previous observable completes. toArray collects all of the emitted values until the previous observable completes.

Using the concat, toArray was a bit annoying and I wanted it to function a bit more like forkJoin with typing. So how did I so this to have one operator?

I created two new observables, concatArray and concatObj. These execute similar to forkJoin, but in sequential order and merges the result into one.

concatArray executes the array of values and type casts them in the correct order.

The concatObj executes the object keys in the order given.

With these operators, concatArray replaced forkJoin that used arrays, and concatObj replaced forkJoin that used objects.

The forkJoin Replacement Result

The database transactions executed in the necessary order. Executing the thread became unblocked. Success!

But the next thing I noticed was a bit more insidious. I expected three records to be updated, while only one ever committed to the database. What... the... fuck?

Problem 2 - Factory Fun

Perplexed, Gemini for all it's insights kept complimenting my questions stating "...incredibly sharp and important question", sigh. As the only contributor to the project, I asked questions about dependency scoping. Remember, a factory creates an instance of the publisher services. But don't they exist in the same request context?

Found out, that depends. It depends on how instantiating that class is scoped and how it's shared in the request scope. The PublisherFactory chooses a social media publisher based on the channel it will be published. So originally, the PublisherFactory create a request scope based service using the code below.

If you can spot the error here, kudos. You're much better at NestJs than me. If you remember anything about the how I'm saving database records, you'll understand that somehow the database context isn't the same within the publisher service when mutating records. In fact, it at times blocked the thread. So what was causing the issue?

One line... one fucking line! If you guessed which one, high five me if we ever meet. It comes down to this. The contextId needs to be the same as the request instantiating the publisher service. Using ContextIdFactory.create, creates a new context id. Then the moduleRef views this as a new request context and instantiates new instances of the dependent classes.

The ContextFactory.getByRequest(this.request) used the same instances within the same request scope. With the TransactionInterceptor, DbConnectionManager being request scope based, services were reinstantiated when the contextId differs. This is why I was seeing some records update and others not that I would expected in the same request.

Conclusion

The TransactionInterceptor allows for committing transactions and releasing the connection. This allows for reuse of connections within the connection pool and prevents from explicitly calling a commit of transactions as units of work from a controller's route.

Executing database call sequentially prevented row locks. Rather than using forkJoin, concat with a combination of toArray operator allowed a predictable way to know when all database calls completed and return the results.

Request context based services while using a factory needs to ensure if a new request scope should be created. A combination of the TransactionInterceptor and a factory that created a new request context instantiated new instances for a sql connection. Using the same request context to instantiate the publisher service allowed the same connection to be used in other request scoped instances.