@@ -3,11 +3,12 @@ import { HttpClient, HttpParams } from '@angular/common/http' | |||
import { Injectable } from '@angular/core' | |||
import { SortMeta } from 'primeng/api' | |||
import { Observable } from 'rxjs' | |||
import { ResultList } from '../../../../../../shared' | |||
import { JobType, ResultList } from '../../../../../../shared' | |||
import { JobState } from '../../../../../../shared/models' | |||
import { Job } from '../../../../../../shared/models/server/job.model' | |||
import { environment } from '../../../../environments/environment' | |||
import { RestExtractor, RestPagination, RestService } from '../../../shared' | |||
import { JobTypeClient } from '../../../../types/job-type-client.type' | |||
@Injectable() | |||
export class JobService { | |||
@@ -19,10 +20,12 @@ export class JobService { | |||
private restExtractor: RestExtractor | |||
) {} | |||
getJobs (state: JobState, pagination: RestPagination, sort: SortMeta): Observable<ResultList<Job>> { | |||
getJobs (state: JobState, jobType: JobTypeClient, pagination: RestPagination, sort: SortMeta): Observable<ResultList<Job>> { | |||
let params = new HttpParams() | |||
params = this.restService.addRestGetParams(params, pagination, sort) | |||
if (jobType !== 'all') params = params.append('jobType', jobType) | |||
return this.authHttp.get<ResultList<Job>>(JobService.BASE_JOB_URL + '/' + state, { params }) | |||
.pipe( | |||
map(res => { | |||
@@ -1,10 +1,22 @@ | |||
<div class="admin-sub-header"> | |||
<div i18n class="form-sub-title">Jobs list</div> | |||
<div class="peertube-select-container"> | |||
<select [(ngModel)]="jobState" (ngModelChange)="onJobStateChanged()"> | |||
<option *ngFor="let state of jobStates" [value]="state">{{ state }}</option> | |||
</select> | |||
<div class="select-filter-block"> | |||
<label for="jobType">Job type</label> | |||
<div class="peertube-select-container"> | |||
<select id="jobType" name="jobType" [(ngModel)]="jobType" (ngModelChange)="onJobStateOrTypeChanged()"> | |||
<option *ngFor="let jobType of jobTypes" [value]="jobType">{{ jobType }}</option> | |||
</select> | |||
</div> | |||
</div> | |||
<div class="select-filter-block"> | |||
<label for="jobState">Job state</label> | |||
<div class="peertube-select-container"> | |||
<select id="jobState" name="jobState" [(ngModel)]="jobState" (ngModelChange)="onJobStateOrTypeChanged()"> | |||
<option *ngFor="let state of jobStates" [value]="state">{{ state }}</option> | |||
</select> | |||
</div> | |||
</div> | |||
</div> | |||
@@ -1,8 +1,22 @@ | |||
@import '_variables'; | |||
@import '_mixins'; | |||
.peertube-select-container { | |||
@include peertube-select-container(auto); | |||
.admin-sub-header { | |||
align-items: flex-end; | |||
.select-filter-block { | |||
&:not(:last-child) { | |||
margin-right: 10px; | |||
} | |||
label { | |||
margin-bottom: 2px; | |||
} | |||
.peertube-select-container { | |||
@include peertube-select-container(auto); | |||
} | |||
} | |||
} | |||
pre { | |||
@@ -2,11 +2,12 @@ import { Component, OnInit } from '@angular/core' | |||
import { peertubeLocalStorage } from '@app/shared/misc/peertube-local-storage' | |||
import { Notifier } from '@app/core' | |||
import { SortMeta } from 'primeng/api' | |||
import { Job } from '../../../../../../shared/index' | |||
import { Job, JobType } from '../../../../../../shared/index' | |||
import { JobState } from '../../../../../../shared/models' | |||
import { RestPagination, RestTable } from '../../../shared' | |||
import { JobService } from './job.service' | |||
import { I18n } from '@ngx-translate/i18n-polyfill' | |||
import { JobTypeClient } from '../../../../types/job-type-client.type' | |||
@Component({ | |||
selector: 'my-jobs', | |||
@@ -15,9 +16,26 @@ import { I18n } from '@ngx-translate/i18n-polyfill' | |||
}) | |||
export class JobsComponent extends RestTable implements OnInit { | |||
private static JOB_STATE_LOCAL_STORAGE_STATE = 'jobs-list-state' | |||
private static JOB_STATE_LOCAL_STORAGE_TYPE = 'jobs-list-type' | |||
jobState: JobState = 'waiting' | |||
jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed' ] | |||
jobType: JobTypeClient = 'all' | |||
jobTypes: JobTypeClient[] = [ | |||
'all', | |||
'activitypub-follow', | |||
'activitypub-http-broadcast', | |||
'activitypub-http-fetcher', | |||
'activitypub-http-unicast', | |||
'email', | |||
'video-transcoding', | |||
'video-file-import', | |||
'video-import', | |||
'videos-views', | |||
'activitypub-refresher' | |||
] | |||
jobs: Job[] = [] | |||
totalRecords: number | |||
rowsPerPage = 10 | |||
@@ -33,20 +51,20 @@ export class JobsComponent extends RestTable implements OnInit { | |||
} | |||
ngOnInit () { | |||
this.loadJobState() | |||
this.loadJobStateAndType() | |||
this.initialize() | |||
} | |||
onJobStateChanged () { | |||
onJobStateOrTypeChanged () { | |||
this.pagination.start = 0 | |||
this.loadData() | |||
this.saveJobState() | |||
this.saveJobStateAndType() | |||
} | |||
protected loadData () { | |||
this.jobsService | |||
.getJobs(this.jobState, this.pagination, this.sort) | |||
.getJobs(this.jobState, this.jobType, this.pagination, this.sort) | |||
.subscribe( | |||
resultList => { | |||
this.jobs = resultList.data | |||
@@ -57,13 +75,16 @@ export class JobsComponent extends RestTable implements OnInit { | |||
) | |||
} | |||
private loadJobState () { | |||
const result = peertubeLocalStorage.getItem(JobsComponent.JOB_STATE_LOCAL_STORAGE_STATE) | |||
private loadJobStateAndType () { | |||
const state = peertubeLocalStorage.getItem(JobsComponent.JOB_STATE_LOCAL_STORAGE_STATE) | |||
if (state) this.jobState = state as JobState | |||
if (result) this.jobState = result as JobState | |||
const type = peertubeLocalStorage.getItem(JobsComponent.JOB_STATE_LOCAL_STORAGE_TYPE) | |||
if (type) this.jobType = type as JobType | |||
} | |||
private saveJobState () { | |||
private saveJobStateAndType () { | |||
peertubeLocalStorage.setItem(JobsComponent.JOB_STATE_LOCAL_STORAGE_STATE, this.jobState) | |||
peertubeLocalStorage.setItem(JobsComponent.JOB_STATE_LOCAL_STORAGE_TYPE, this.jobType) | |||
} | |||
} |
@@ -0,0 +1,3 @@ | |||
import { JobType } from '@shared/models' | |||
export type JobTypeClient = 'all' | JobType |
@@ -24,7 +24,7 @@ jobsRouter.get('/:state', | |||
jobsSortValidator, | |||
setDefaultSort, | |||
setDefaultPagination, | |||
asyncMiddleware(listJobsValidator), | |||
listJobsValidator, | |||
asyncMiddleware(listJobs) | |||
) | |||
@@ -39,8 +39,15 @@ export { | |||
async function listJobs (req: express.Request, res: express.Response) { | |||
const state = req.params.state as JobState | |||
const asc = req.query.sort === 'createdAt' | |||
const jobType = req.query.jobType | |||
const jobs = await JobQueue.Instance.listForApi(state, req.query.start, req.query.count, asc) | |||
const jobs = await JobQueue.Instance.listForApi({ | |||
state, | |||
start: req.query.start, | |||
count: req.query.count, | |||
asc, | |||
jobType | |||
}) | |||
const total = await JobQueue.Instance.count(state) | |||
const result: ResultList<any> = { | |||
@@ -1,14 +1,20 @@ | |||
import { JobState } from '../../../shared/models' | |||
import { exists } from './misc' | |||
import { jobTypes } from '@server/lib/job-queue/job-queue' | |||
const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed' ] | |||
function isValidJobState (value: JobState) { | |||
return exists(value) && jobStates.indexOf(value) !== -1 | |||
return exists(value) && jobStates.includes(value) | |||
} | |||
function isValidJobType (value: any) { | |||
return exists(value) && jobTypes.includes(value) | |||
} | |||
// --------------------------------------------------------------------------- | |||
export { | |||
isValidJobState | |||
isValidJobState, | |||
isValidJobType | |||
} |
@@ -121,11 +121,20 @@ class JobQueue { | |||
return queue.add(obj.payload, jobArgs) | |||
} | |||
async listForApi (state: JobState, start: number, count: number, asc?: boolean): Promise<Bull.Job[]> { | |||
async listForApi (options: { | |||
state: JobState, | |||
start: number, | |||
count: number, | |||
asc?: boolean, | |||
jobType: JobType | |||
}): Promise<Bull.Job[]> { | |||
const { state, start, count, asc, jobType } = options | |||
let results: Bull.Job[] = [] | |||
const filteredJobTypes = this.filterJobTypes(jobType) | |||
// TODO: optimize | |||
for (const jobType of jobTypes) { | |||
for (const jobType of filteredJobTypes) { | |||
const queue = this.queues[ jobType ] | |||
if (queue === undefined) { | |||
logger.error('Unknown queue %s to list jobs.', jobType) | |||
@@ -149,10 +158,12 @@ class JobQueue { | |||
return results.slice(start, start + count) | |||
} | |||
async count (state: JobState): Promise<number> { | |||
async count (state: JobState, jobType?: JobType): Promise<number> { | |||
let total = 0 | |||
for (const type of jobTypes) { | |||
const filteredJobTypes = this.filterJobTypes(jobType) | |||
for (const type of filteredJobTypes) { | |||
const queue = this.queues[ type ] | |||
if (queue === undefined) { | |||
logger.error('Unknown queue %s to count jobs.', type) | |||
@@ -180,6 +191,12 @@ class JobQueue { | |||
}) | |||
} | |||
private filterJobTypes (jobType?: JobType) { | |||
if (!jobType) return jobTypes | |||
return jobTypes.filter(t => t === jobType) | |||
} | |||
static get Instance () { | |||
return this.instance || (this.instance = new this()) | |||
} | |||
@@ -188,5 +205,6 @@ class JobQueue { | |||
// --------------------------------------------------------------------------- | |||
export { | |||
jobTypes, | |||
JobQueue | |||
} |
@@ -1,13 +1,17 @@ | |||
import * as express from 'express' | |||
import { param } from 'express-validator' | |||
import { isValidJobState } from '../../helpers/custom-validators/jobs' | |||
import { param, query } from 'express-validator' | |||
import { isValidJobState, isValidJobType } from '../../helpers/custom-validators/jobs' | |||
import { logger } from '../../helpers/logger' | |||
import { areValidationErrors } from './utils' | |||
const listJobsValidator = [ | |||
param('state').custom(isValidJobState).not().isEmpty().withMessage('Should have a valid job state'), | |||
param('state') | |||
.custom(isValidJobState).not().isEmpty().withMessage('Should have a valid job state'), | |||
query('jobType') | |||
.optional() | |||
.custom(isValidJobType).withMessage('Should have a valid job state'), | |||
async (req: express.Request, res: express.Response, next: express.NextFunction) => { | |||
(req: express.Request, res: express.Response, next: express.NextFunction) => { | |||
logger.debug('Checking listJobsValidator parameters.', { parameters: req.params }) | |||
if (areValidationErrors(req, res)) return | |||
@@ -51,6 +51,17 @@ describe('Test jobs API validators', function () { | |||
}) | |||
}) | |||
it('Should fail with an incorrect job type', async function () { | |||
await makeGetRequest({ | |||
url: server.url, | |||
token: server.accessToken, | |||
path, | |||
query: { | |||
jobType: 'toto' | |||
} | |||
}) | |||
}) | |||
it('Should fail with a bad start pagination', async function () { | |||
await checkBadStartPagination(server.url, path, server.accessToken) | |||
}) | |||
@@ -79,6 +90,7 @@ describe('Test jobs API validators', function () { | |||
statusCodeExpected: 403 | |||
}) | |||
}) | |||
}) | |||
after(async function () { | |||
@@ -184,7 +184,14 @@ describe('Test handle downs', function () { | |||
const states: JobState[] = [ 'waiting', 'active' ] | |||
for (const state of states) { | |||
const res = await getJobsListPaginationAndSort(servers[ 0 ].url, servers[ 0 ].accessToken, state,0, 50, '-createdAt') | |||
const res = await getJobsListPaginationAndSort({ | |||
url: servers[ 0 ].url, | |||
accessToken: servers[ 0 ].accessToken, | |||
state: state, | |||
start: 0, | |||
count: 50, | |||
sort: '-createdAt' | |||
}) | |||
expect(res.body.data).to.have.length(0) | |||
} | |||
}) | |||
@@ -41,20 +41,46 @@ describe('Test jobs', function () { | |||
expect(res.body.data).to.have.length.above(2) | |||
}) | |||
it('Should list jobs with sort and pagination', async function () { | |||
const res = await getJobsListPaginationAndSort(servers[1].url, servers[1].accessToken, 'completed', 1, 2, 'createdAt') | |||
expect(res.body.total).to.be.above(2) | |||
expect(res.body.data).to.have.lengthOf(2) | |||
it('Should list jobs with sort, pagination and job type', async function () { | |||
{ | |||
const res = await getJobsListPaginationAndSort({ | |||
url: servers[ 1 ].url, | |||
accessToken: servers[ 1 ].accessToken, | |||
state: 'completed', | |||
start: 1, | |||
count: 2, | |||
sort: 'createdAt' | |||
}) | |||
expect(res.body.total).to.be.above(2) | |||
expect(res.body.data).to.have.lengthOf(2) | |||
let job: Job = res.body.data[ 0 ] | |||
// Skip repeat jobs | |||
if (job.type === 'videos-views') job = res.body.data[ 1 ] | |||
expect(job.state).to.equal('completed') | |||
expect(job.type.startsWith('activitypub-')).to.be.true | |||
expect(dateIsValid(job.createdAt as string)).to.be.true | |||
expect(dateIsValid(job.processedOn as string)).to.be.true | |||
expect(dateIsValid(job.finishedOn as string)).to.be.true | |||
} | |||
let job = res.body.data[0] | |||
// Skip repeat jobs | |||
if (job.type === 'videos-views') job = res.body.data[1] | |||
{ | |||
const res = await getJobsListPaginationAndSort({ | |||
url: servers[ 1 ].url, | |||
accessToken: servers[ 1 ].accessToken, | |||
state: 'completed', | |||
start: 0, | |||
count: 100, | |||
sort: 'createdAt', | |||
jobType: 'activitypub-http-broadcast' | |||
}) | |||
expect(res.body.total).to.be.above(2) | |||
expect(job.state).to.equal('completed') | |||
expect(job.type.startsWith('activitypub-')).to.be.true | |||
expect(dateIsValid(job.createdAt)).to.be.true | |||
expect(dateIsValid(job.processedOn)).to.be.true | |||
expect(dateIsValid(job.finishedOn)).to.be.true | |||
for (const j of res.body.data as Job[]) { | |||
expect(j.type).to.equal('activitypub-http-broadcast') | |||
} | |||
} | |||
}) | |||
after(async function () { | |||
@@ -354,7 +354,14 @@ async function isTherePendingRequests (servers: ServerInfo[]) { | |||
// Check if each server has pending request | |||
for (const server of servers) { | |||
for (const state of states) { | |||
const p = getJobsListPaginationAndSort(server.url, server.accessToken, state, 0, 10, '-createdAt') | |||
const p = getJobsListPaginationAndSort({ | |||
url: server.url, | |||
accessToken: server.accessToken, | |||
state: state, | |||
start: 0, | |||
count: 10, | |||
sort: '-createdAt' | |||
}) | |||
.then(res => { | |||
if (res.body.total > 0) pendingRequests = true | |||
}) | |||
@@ -1,7 +1,8 @@ | |||
import * as request from 'supertest' | |||
import { Job, JobState } from '../../models' | |||
import { Job, JobState, JobType } from '../../models' | |||
import { wait } from '../miscs/miscs' | |||
import { ServerInfo } from './servers' | |||
import { makeGetRequest } from '@shared/extra-utils' | |||
function getJobsList (url: string, accessToken: string, state: JobState) { | |||
const path = '/api/v1/jobs/' + state | |||
@@ -14,18 +15,32 @@ function getJobsList (url: string, accessToken: string, state: JobState) { | |||
.expect('Content-Type', /json/) | |||
} | |||
function getJobsListPaginationAndSort (url: string, accessToken: string, state: JobState, start: number, count: number, sort: string) { | |||
function getJobsListPaginationAndSort (options: { | |||
url: string, | |||
accessToken: string, | |||
state: JobState, | |||
start: number, | |||
count: number, | |||
sort: string, | |||
jobType?: JobType | |||
}) { | |||
const { url, accessToken, state, start, count, sort, jobType } = options | |||
const path = '/api/v1/jobs/' + state | |||
return request(url) | |||
.get(path) | |||
.query({ start }) | |||
.query({ count }) | |||
.query({ sort }) | |||
.set('Accept', 'application/json') | |||
.set('Authorization', 'Bearer ' + accessToken) | |||
.expect(200) | |||
.expect('Content-Type', /json/) | |||
const query = { | |||
start, | |||
count, | |||
sort, | |||
jobType | |||
} | |||
return makeGetRequest({ | |||
url, | |||
path, | |||
token: accessToken, | |||
statusCodeExpected: 200, | |||
query | |||
}) | |||
} | |||
async function waitJobs (serversArg: ServerInfo[] | ServerInfo) { | |||
@@ -44,7 +59,14 @@ async function waitJobs (serversArg: ServerInfo[] | ServerInfo) { | |||
// Check if each server has pending request | |||
for (const server of servers) { | |||
for (const state of states) { | |||
const p = getJobsListPaginationAndSort(server.url, server.accessToken, state, 0, 10, '-createdAt') | |||
const p = getJobsListPaginationAndSort({ | |||
url: server.url, | |||
accessToken: server.accessToken, | |||
state: state, | |||
start: 0, | |||
count: 10, | |||
sort: '-createdAt' | |||
}) | |||
.then(res => res.body.data) | |||
.then((jobs: Job[]) => jobs.filter(j => j.type !== 'videos-views')) | |||
.then(jobs => { | |||
@@ -17,7 +17,7 @@ export interface Job { | |||
type: JobType | |||
data: any, | |||
error: any, | |||
createdAt: Date | |||
finishedOn: Date | |||
processedOn: Date | |||
createdAt: Date | string | |||
finishedOn: Date | string | |||
processedOn: Date | string | |||
} |