Browse Source

Prevent job queue to be started before plugins

tags/v4.3.0
Chocobozzz 1 week ago
parent
commit
4404a7c467
No known key found for this signature in database GPG Key ID: 583A612D890159BE
7 changed files with 38 additions and 15 deletions
  1. +1
    -1
      scripts/create-import-video-file-job.ts
  2. +1
    -1
      scripts/create-move-video-storage-job.ts
  3. +1
    -1
      scripts/create-transcoding-job.ts
  4. +1
    -1
      scripts/migrations/peertube-4.0.ts
  5. +1
    -1
      scripts/migrations/peertube-4.2.ts
  6. +6
    -0
      server.ts
  7. +27
    -10
      server/lib/job-queue/job-queue.ts

+ 1
- 1
scripts/create-import-video-file-job.ts View File

@@ -44,7 +44,7 @@ async function run () {
filePath: resolve(options.import)
}

JobQueue.Instance.init(true)
JobQueue.Instance.init()
await JobQueue.Instance.createJob({ type: 'video-file-import', payload: dataInput })
console.log('Import job for video %s created.', video.uuid)
}

+ 1
- 1
scripts/create-move-video-storage-job.ts View File

@@ -37,7 +37,7 @@ run()
async function run () {
await initDatabaseModels(true)

JobQueue.Instance.init(true)
JobQueue.Instance.init()

let ids: number[] = []



+ 1
- 1
scripts/create-transcoding-job.ts View File

@@ -97,7 +97,7 @@ async function run () {
}
}

JobQueue.Instance.init(true)
JobQueue.Instance.init()

video.state = VideoState.TO_TRANSCODE
await video.save()


+ 1
- 1
scripts/migrations/peertube-4.0.ts View File

@@ -21,7 +21,7 @@ async function run () {

await initDatabaseModels(true)

JobQueue.Instance.init(true)
JobQueue.Instance.init()

const ids = await VideoModel.listLocalIds()



+ 1
- 1
scripts/migrations/peertube-4.2.ts View File

@@ -27,7 +27,7 @@ async function run () {
console.log('Generate avatar miniatures from existing avatars.')

await initDatabaseModels(true)
JobQueue.Instance.init(true)
JobQueue.Instance.init()

const accounts: AccountModel[] = await AccountModel.findAll({
include: [


+ 6
- 0
server.ts View File

@@ -351,6 +351,12 @@ async function startApplication () {
ApplicationModel.updateNodeVersions()
.catch(err => logger.error('Cannot update node versions.', { err }))

JobQueue.Instance.start()
.catch(err => {
logger.error('Cannot start job queue.', { err })
process.exit(-1)
})

logger.info('HTTP server listening on %s:%d', hostname, port)
logger.info('Web server: %s', WEBSERVER.URL)



+ 27
- 10
server/lib/job-queue/job-queue.ts View File

@@ -168,7 +168,7 @@ class JobQueue {
private constructor () {
}

init (produceOnly = false) {
init () {
// Already initialized
if (this.initialized === true) return
this.initialized = true
@@ -176,10 +176,10 @@ class JobQueue {
this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST

for (const handlerName of (Object.keys(handlers) as JobType[])) {
this.buildWorker(handlerName, produceOnly)
this.buildWorker(handlerName)
this.buildQueue(handlerName)
this.buildQueueScheduler(handlerName, produceOnly)
this.buildQueueEvent(handlerName, produceOnly)
this.buildQueueScheduler(handlerName)
this.buildQueueEvent(handlerName)
}

this.flowProducer = new FlowProducer({
@@ -191,9 +191,9 @@ class JobQueue {
this.addRepeatableJobs()
}

private buildWorker (handlerName: JobType, produceOnly: boolean) {
private buildWorker (handlerName: JobType) {
const workerOptions: WorkerOptions = {
autorun: !produceOnly,
autorun: false,
concurrency: this.getJobConcurrency(handlerName),
prefix: this.jobRedisPrefix,
connection: this.getRedisConnection()
@@ -246,9 +246,9 @@ class JobQueue {
this.queues[handlerName] = queue
}

private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) {
private buildQueueScheduler (handlerName: JobType) {
const queueSchedulerOptions: QueueSchedulerOptions = {
autorun: !produceOnly,
autorun: false,
connection: this.getRedisConnection(),
prefix: this.jobRedisPrefix,
maxStalledCount: 10
@@ -260,9 +260,9 @@ class JobQueue {
this.queueSchedulers[handlerName] = queueScheduler
}

private buildQueueEvent (handlerName: JobType, produceOnly: boolean) {
private buildQueueEvent (handlerName: JobType) {
const queueEventsOptions: QueueEventsOptions = {
autorun: !produceOnly,
autorun: false,
connection: this.getRedisConnection(),
prefix: this.jobRedisPrefix
}
@@ -304,6 +304,23 @@ class JobQueue {
return Promise.all(promises)
}

start () {
const promises = Object.keys(this.workers)
.map(handlerName => {
const worker: Worker = this.workers[handlerName]
const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
const queueEvent: QueueEvents = this.queueEvents[handlerName]

return Promise.all([
worker.run(),
queueScheduler.run(),
queueEvent.run()
])
})

return Promise.all(promises)
}

async pause () {
for (const handlerName of Object.keys(this.workers)) {
const worker: Worker = this.workers[handlerName]


Loading…
Cancel
Save