Workers - config
Psychic workers leverages BullMQ under the hood to drive our background job systems. Our configuration largely circulates around arguments which would be directly provided to bullmq, but with some nuance, which is both due to the nature of the system being wrapped, as well as the fact that we desired a simpler configuration path for those not looking to leverage every possible bell and whistle, and really just looking to set up something simple.
Types
Psychic scans your worker config whenever you run yarn psy sync
, capturing the workstream names you have configured for use when configuring your app. In order for these types to flow throughout your system, we recommend you add a few files to your file system.
If you selected "yes" to background workers during the cli prompt, these files will already exist in your system.
// app/services/ApplicationBackgroundedService.ts
import { BaseBackgroundedService } from '@rvoh/psychic-workers'
import psychicTypes from '../../types/psychic'
export default class ApplicationBackgroundedService extends BaseBackgroundedService {
public get psychicTypes() {
return psychicTypes
}
}
// app/services/ApplicationScheduledService.ts
import { BaseScheduledService } from '@rvoh/psychic-workers'
import psychicTypes from '../../types/psychic'
export default class ApplicationScheduledService extends BaseScheduledService {
public get psychicTypes() {
return psychicTypes
}
}
You can now inhgerit from each of these classes in your application, allowing the types to flow gracefully through and protect you from making any mistakes.
Workstreams
Psychic superimposes a new concept, called a workstream
, on top of bullmq's existing system, which enables one to easily grasp, configure, and express their background systems. In essense, you can think of a workstream as a group, containing a queue and a set of workers to work off that queue.
Many applications will only need one workstream, but multiple workstreams are extremely useful when, for example, you have background jobs responsible for hitting external APIs which are rate limited. Having a background system that doesn't respect rate limits can lead to real chaos in your systems, so it is useful to partition those out into their own workstreams. By doing so, you can ensure that only one worker is working off that queue, and you can either manually ensure that it breathes a certain amount of time before issuing another request, or else leverage BullMQ Pro, which provides rate limiting tools out of the box that would enable you to spin up multiple workers on this workstream without concern of rate limit abuse.
To set up workstreams in Psychic, all you need to do is add a basic workstream setup to your conf/workers.ts
file:
// conf/workers.ts
export default (workersApp: PsychicApplicationWorkers) => {
workersApp.set('background', {
defaultWorkstream: {
// https://docs.bullmq.io/guide/parallelism-and-concurrency
workerCount: os.cpus().length,
concurrency: 100,
},
defaultQueueConnection: AppEnv.isProduction
? new Cluster(
[
{
host: AppEnv.string('BG_JOBS_REDIS_HOST'),
port: AppEnv.integer('BG_JOBS_REDIS_PORT'),
},
],
{
slotsRefreshTimeout: 5000,
dnsLookup: (address, callback) => callback(null, address),
redisOptions: {
username: AppEnv.string('BG_JOBS_REDIS_USERNAME'),
password: AppEnv.string('BG_JOBS_REDIS_PASSWORD'),
tls: AppEnv.isProduction ? {} : undefined,
},
enableOfflineQueue: false,
},
)
: new Redis({
host: process.env.BACKGROUND_JOBS_REDIS_HOST || 'localhost',
port: parseInt(process.env.BACKGROUND_JOBS_REDIS_PORT || '6379', 10),
username: process.env.BACKGROUND_JOBS_REDIS_USERNAME,
password: process.env.BACKGROUND_JOBS_REDIS_PASSWORD,
tls: AppEnv.isProduction ? {} : undefined,
enableOfflineQueue: false,
}),
defaultWorkerConnection: !process.env.WORKER_SERVICE
? undefined
: AppEnv.isProduction
? new Cluster(
[
{
host: AppEnv.string('BG_JOBS_REDIS_HOST'),
port: AppEnv.integer('BG_JOBS_REDIS_PORT'),
},
],
{
slotsRefreshTimeout: 5000,
dnsLookup: (address, callback) => callback(null, address),
redisOptions: {
username: AppEnv.string('BG_JOBS_REDIS_USERNAME'),
password: AppEnv.string('BG_JOBS_REDIS_PASSWORD'),
tls: AppEnv.isProduction ? {} : undefined,
maxRetriesPerRequest: null,
},
},
)
: new Redis({
host: process.env.BACKGROUND_JOBS_REDIS_HOST || 'localhost',
port: parseInt(
process.env.BACKGROUND_JOBS_REDIS_PORT || '6379',
10,
),
username: process.env.BACKGROUND_JOBS_REDIS_USERNAME,
password: process.env.BACKGROUND_JOBS_REDIS_PASSWORD,
tls: AppEnv.isProduction ? {} : undefined,
maxRetriesPerRequest: null,
}),
})
}
At first glance, this configuration might seem overwhelming to you, but most of this is just basic connection details for the queue and worker redis connections. Since bullmq runs on redis, you must provide redis connections for the queue and worker to leverage. Once the redis connections are set up, the only remaining thing to do is add a configuration for the default worker. Here is that section, isolated for you:
defaultWorkstream: {
// https://docs.bullmq.io/guide/parallelism-and-concurrency
workerCount: os.cpus().length,
concurrency: 100,
},
With this done, any service in your application extending the ApplicationBackgroundedService
or ApplicationScheduledService
classes will automatically send their jobs to the queue belonging to the default workstream.
Named workstreams
If your app demands multiple queues, you will want to leverage multiple workstreams. This will enable you to segment off groups of workers to work on one queue, and one group to work on the other. To do this, you can add a named workstream to the conf/workers.ts
file:
defaultWorkstream: {
// https://docs.bullmq.io/guide/parallelism-and-concurrency
workerCount: os.cpus().length,
concurrency: 100,
},
namedWorkstreams: [
{
name: 'FileImport',
workerCount: 1,
},
]
With a named workstream added to your config, re-run yarn psy sync
to compile the latest psychic types. Once done, you will want to also adjust the background configuration for the services you wish to tap into this queue:
export default class FileImporter extends ApplicationBackgroundedService {
public static get backgroundJobConfig(): BackgroundJobConfig<ApplicationBackgroundedService> {
return { priority: 'not_urgent', workstream: 'FileImport' }
}
...
}