Browse Source

Delete completed/failed jobs directly from bullmq

develop
Chocobozzz 3 days ago
parent
commit
c3b21b68b5
No known key found for this signature in database GPG Key ID: 583A612D890159BE
3 changed files with 61 additions and 9 deletions
  1. +20
    -2
      server/initializers/constants.ts
  2. +39
    -7
      server/lib/job-queue/job-queue.ts
  3. +2
    -0
      server/lib/schedulers/remove-old-jobs-scheduler.ts

+ 20
- 2
server/initializers/constants.ts View File

@@ -239,7 +239,23 @@ const REQUEST_TIMEOUTS = {
REDUNDANCY: JOB_TTL['video-redundancy']
}

const JOB_COMPLETED_LIFETIME = 60000 * 60 * 24 * 2 // 2 days
const JOB_REMOVAL_OPTIONS = {
COUNT: 10000, // Max jobs to store

SUCCESS: { // Success jobs
'DEFAULT': parseDurationToMs('2 days'),

'activitypub-http-broadcast-parallel': parseDurationToMs('10 minutes'),
'activitypub-http-unicast': parseDurationToMs('1 hour'),
'videos-views-stats': parseDurationToMs('3 hours'),
'activitypub-refresher': parseDurationToMs('10 hours')
},

FAILURE: { // Failed job
DEFAULT: parseDurationToMs('7 days')
}
}

const VIDEO_IMPORT_TIMEOUT = Math.floor(JOB_TTL['video-import'] * 0.9)

const SCHEDULER_INTERVALS_MS = {
@@ -938,6 +954,8 @@ if (process.env.PRODUCTION_CONSTANTS !== 'true') {
OVERVIEWS.VIDEOS.SAMPLE_THRESHOLD = 2

PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME = 5000

JOB_REMOVAL_OPTIONS.SUCCESS['videos-views-stats'] = 10000
}

if (isTestInstance()) {
@@ -1069,7 +1087,7 @@ export {
CRAWL_REQUEST_CONCURRENCY,
DEFAULT_AUDIO_RESOLUTION,
BINARY_CONTENT_TYPES,
JOB_COMPLETED_LIFETIME,
JOB_REMOVAL_OPTIONS,
HTTP_SIGNATURE,
VIDEO_IMPORT_STATES,
VIDEO_CHANNEL_SYNC_STATE,


+ 39
- 7
server/lib/job-queue/job-queue.ts View File

@@ -41,8 +41,16 @@ import {
VideoTranscodingPayload
} from '../../../shared/models'
import { logger } from '../../helpers/logger'
import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
import {
JOB_ATTEMPTS,
JOB_CONCURRENCY,
JOB_REMOVAL_OPTIONS,
JOB_TTL,
REPEAT_JOBS,
WEBSERVER
} from '../../initializers/constants'
import { Hooks } from '../plugins/hooks'
import { Redis } from '../redis'
import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
import { processActivityPubFollow } from './handlers/activitypub-follow'
import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast'
@@ -63,7 +71,7 @@ import { processVideoLiveEnding } from './handlers/video-live-ending'
import { processVideoStudioEdition } from './handlers/video-studio-edition'
import { processVideoTranscoding } from './handlers/video-transcoding'
import { processVideosViewsStats } from './handlers/video-views-stats'
import { Redis } from '../redis'
import { parseDurationToMs } from '@server/helpers/core-utils'

export type CreateJobArgument =
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -373,7 +381,7 @@ class JobQueue {
})
}

private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) {
private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions): FlowJob {
return {
name: 'job',
data: job.payload,
@@ -387,7 +395,9 @@ class JobQueue {
backoff: { delay: 60 * 1000, type: 'exponential' },
attempts: JOB_ATTEMPTS[type],
priority: options.priority,
delay: options.delay
delay: options.delay,

...this.buildJobRemovalOptions(type)
}
}

@@ -482,18 +492,23 @@ class JobQueue {
async removeOldJobs () {
for (const key of Object.keys(this.queues)) {
const queue: Queue = this.queues[key]
await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed')
await queue.clean(parseDurationToMs('7 days'), 100, 'completed')
await queue.clean(parseDurationToMs('7 days'), 100, 'failed')
}
}

private addRepeatableJobs () {
this.queues['videos-views-stats'].add('job', {}, {
repeat: REPEAT_JOBS['videos-views-stats']
repeat: REPEAT_JOBS['videos-views-stats'],

...this.buildJobRemovalOptions('videos-views-stats')
}).catch(err => logger.error('Cannot add repeatable job.', { err }))

if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
this.queues['activitypub-cleaner'].add('job', {}, {
repeat: REPEAT_JOBS['activitypub-cleaner']
repeat: REPEAT_JOBS['activitypub-cleaner'],

...this.buildJobRemovalOptions('activitypub-cleaner')
}).catch(err => logger.error('Cannot add repeatable job.', { err }))
}
}
@@ -505,6 +520,23 @@ class JobQueue {
return JOB_CONCURRENCY[jobType]
}

private buildJobRemovalOptions (queueName: string) {
return {
removeOnComplete: {
// Wants seconds
age: (JOB_REMOVAL_OPTIONS.SUCCESS[queueName] || JOB_REMOVAL_OPTIONS.SUCCESS.DEFAULT) / 1000,

count: JOB_REMOVAL_OPTIONS.COUNT
},
removeOnFail: {
// Wants seconds
age: (JOB_REMOVAL_OPTIONS.FAILURE[queueName] || JOB_REMOVAL_OPTIONS.FAILURE.DEFAULT) / 1000,

count: JOB_REMOVAL_OPTIONS.COUNT / 1000
}
}
}

static get Instance () {
return this.instance || (this.instance = new this())
}


+ 2
- 0
server/lib/schedulers/remove-old-jobs-scheduler.ts View File

@@ -4,6 +4,8 @@ import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
import { JobQueue } from '../job-queue'
import { AbstractScheduler } from './abstract-scheduler'

// FIXME: delete this scheduler in a few versions (introduced in 5.0)
// We introduced job removal directly using bullmq option but we still need to delete old jobs
export class RemoveOldJobsScheduler extends AbstractScheduler {

private static instance: AbstractScheduler


Loading…
Cancel
Save