|
|
@@ -2,7 +2,7 @@ import * as Bull from 'bull' |
|
|
|
import { JobState, JobType } from '../../../shared/models' |
|
|
|
import { logger } from '../../helpers/logger' |
|
|
|
import { Redis } from '../redis' |
|
|
|
import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers' |
|
|
|
import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL } from '../../initializers' |
|
|
|
import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' |
|
|
|
import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' |
|
|
|
import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' |
|
|
@@ -79,6 +79,7 @@ class JobQueue { |
|
|
|
const handler = handlers[handlerName] |
|
|
|
|
|
|
|
queue.process(JOB_CONCURRENCY[handlerName], handler) |
|
|
|
.catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) |
|
|
|
|
|
|
|
queue.on('failed', (job, err) => { |
|
|
|
logger.error('Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err }) |
|
|
@@ -109,11 +110,8 @@ class JobQueue { |
|
|
|
|
|
|
|
const jobArgs: Bull.JobOptions = { |
|
|
|
backoff: { delay: 60 * 1000, type: 'exponential' }, |
|
|
|
attempts: JOB_ATTEMPTS[obj.type] |
|
|
|
} |
|
|
|
|
|
|
|
if (jobsWithRequestTimeout[obj.type] === true) { |
|
|
|
jobArgs.timeout = JOB_REQUEST_TTL |
|
|
|
attempts: JOB_ATTEMPTS[obj.type], |
|
|
|
timeout: JOB_TTL[obj.type] |
|
|
|
} |
|
|
|
|
|
|
|
return queue.add(obj.payload, jobArgs) |
|
|
|