|
|
@@ -2,6 +2,7 @@ import * as kue from 'kue' |
|
|
|
import { JobType, JobState } from '../../../shared/models' |
|
|
|
import { logger } from '../../helpers/logger' |
|
|
|
import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers' |
|
|
|
import { Redis } from '../redis' |
|
|
|
import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' |
|
|
|
import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' |
|
|
|
import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' |
|
|
@@ -29,6 +30,7 @@ class JobQueue { |
|
|
|
|
|
|
|
private jobQueue: kue.Queue |
|
|
|
private initialized = false |
|
|
|
private jobRedisPrefix: string |
|
|
|
|
|
|
|
private constructor () {} |
|
|
|
|
|
|
@@ -37,8 +39,10 @@ class JobQueue { |
|
|
|
if (this.initialized === true) return |
|
|
|
this.initialized = true |
|
|
|
|
|
|
|
this.jobRedisPrefix = 'q-' + CONFIG.WEBSERVER.HOST |
|
|
|
|
|
|
|
this.jobQueue = kue.createQueue({ |
|
|
|
prefix: 'q-' + CONFIG.WEBSERVER.HOST, |
|
|
|
prefix: this.jobRedisPrefix, |
|
|
|
redis: { |
|
|
|
host: CONFIG.REDIS.HOSTNAME, |
|
|
|
port: CONFIG.REDIS.PORT, |
|
|
@@ -83,14 +87,14 @@ class JobQueue { |
|
|
|
}) |
|
|
|
} |
|
|
|
|
|
|
|
listForApi (state: JobState, start: number, count: number, sort: string) { |
|
|
|
return new Promise<kue.Job[]>((res, rej) => { |
|
|
|
kue.Job.rangeByState(state, start, start + count - 1, sort, (err, jobs) => { |
|
|
|
if (err) return rej(err) |
|
|
|
async listForApi (state: JobState, start: number, count: number, sort: 'ASC' | 'DESC') { |
|
|
|
const jobStrings = await Redis.Instance.listJobs(this.jobRedisPrefix, state, 'alpha', sort, start, count) |
|
|
|
|
|
|
|
return res(jobs) |
|
|
|
}) |
|
|
|
}) |
|
|
|
const jobPromises = jobStrings |
|
|
|
.map(s => s.split('|')) |
|
|
|
.map(([ , jobId ]) => this.getJob(parseInt(jobId, 10))) |
|
|
|
|
|
|
|
return Promise.all(jobPromises) |
|
|
|
} |
|
|
|
|
|
|
|
count (state: JobState) { |
|
|
@@ -144,6 +148,16 @@ class JobQueue { |
|
|
|
return Promise.all(promises) |
|
|
|
} |
|
|
|
|
|
|
|
private getJob (id: number) { |
|
|
|
return new Promise((res, rej) => { |
|
|
|
kue.Job.get(id, (err, job) => { |
|
|
|
if (err) return rej(err) |
|
|
|
|
|
|
|
return res(job) |
|
|
|
}) |
|
|
|
}) |
|
|
|
} |
|
|
|
|
|
|
|
static get Instance () { |
|
|
|
return this.instance || (this.instance = new this()) |
|
|
|
} |
|
|
|