Browse Source

Move job queue to redis

We'll use it as cache in the future.

/!\ You'll loose your old jobs (pending jobs too) so upgrade only when
you don't have pending job anymore.
tags/v0.0.16-alpha
Chocobozzz 4 years ago
parent
commit
94a5ff8a4a
No known key found for this signature in database GPG Key ID: 583A612D890159BE
60 changed files with 985 additions and 696 deletions
  1. +1
    -0
      README.md
  2. +12
    -5
      client/src/app/+admin/jobs/jobs-list/jobs-list.component.html
  3. +7
    -0
      client/src/app/+admin/jobs/jobs-list/jobs-list.component.scss
  4. +9
    -2
      client/src/app/+admin/jobs/jobs-list/jobs-list.component.ts
  5. +5
    -4
      client/src/app/+admin/jobs/shared/job.service.ts
  6. +5
    -0
      config/default.yaml
  7. +5
    -0
      config/production.yaml.example
  8. +2
    -0
      package.json
  9. +1
    -0
      scripts/clean/server/test.sh
  10. +20
    -2
      scripts/parse-log.ts
  11. +4
    -4
      server.ts
  12. +32
    -6
      server/controllers/api/jobs.ts
  13. +1
    -1
      server/controllers/api/server/follows.ts
  14. +13
    -11
      server/controllers/api/videos/index.ts
  15. +14
    -0
      server/helpers/custom-validators/jobs.ts
  16. +1
    -0
      server/helpers/database-utils.ts
  17. +24
    -25
      server/initializers/constants.ts
  18. +0
    -2
      server/initializers/database.ts
  19. +2
    -3
      server/initializers/migrations/0100-activitypub.ts
  20. +18
    -0
      server/initializers/migrations/0180-job-table-delete.ts
  21. +34
    -25
      server/lib/activitypub/actor.ts
  22. +4
    -5
      server/lib/activitypub/fetch.ts
  23. +1
    -1
      server/lib/activitypub/process/process-accept.ts
  24. +1
    -1
      server/lib/activitypub/process/process-follow.ts
  25. +12
    -14
      server/lib/activitypub/send/misc.ts
  26. +2
    -3
      server/lib/activitypub/send/send-accept.ts
  27. +1
    -1
      server/lib/activitypub/send/send-announce.ts
  28. +14
    -8
      server/lib/activitypub/send/send-create.ts
  29. +2
    -3
      server/lib/activitypub/send/send-follow.ts
  30. +1
    -1
      server/lib/activitypub/send/send-like.ts
  31. +4
    -10
      server/lib/activitypub/send/send-undo.ts
  32. +14
    -18
      server/lib/job-queue/handlers/activitypub-http-broadcast.ts
  33. +11
    -16
      server/lib/job-queue/handlers/activitypub-http-fetcher.ts
  34. +43
    -0
      server/lib/job-queue/handlers/activitypub-http-unicast.ts
  35. +39
    -0
      server/lib/job-queue/handlers/utils/activitypub-http-utils.ts
  36. +40
    -20
      server/lib/job-queue/handlers/video-file.ts
  37. +1
    -0
      server/lib/job-queue/index.ts
  38. +124
    -0
      server/lib/job-queue/job-queue.ts
  39. +0
    -94
      server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts
  40. +0
    -50
      server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts
  41. +0
    -1
      server/lib/jobs/activitypub-http-job-scheduler/index.ts
  42. +0
    -2
      server/lib/jobs/index.ts
  43. +0
    -144
      server/lib/jobs/job-scheduler.ts
  44. +0
    -1
      server/lib/jobs/transcoding-job-scheduler/index.ts
  45. +0
    -23
      server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts
  46. +0
    -48
      server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts
  47. +19
    -0
      server/lib/schedulers/remove-old-jobs-scheduler.ts
  48. +23
    -0
      server/middlewares/validators/jobs.ts
  49. +0
    -80
      server/models/job/job.ts
  50. +10
    -1
      server/tests/api/check-params/jobs.ts
  51. +5
    -5
      server/tests/api/server/handle-down.ts
  52. +5
    -5
      server/tests/api/server/jobs.ts
  53. +11
    -10
      server/tests/api/videos/multiple-servers.ts
  54. +9
    -12
      server/tests/real-world/real-world.ts
  55. +5
    -4
      server/tests/utils/server/jobs.ts
  56. +9
    -5
      shared/models/job.model.ts
  57. +2
    -2
      support/doc/dependencies.md
  58. +1
    -0
      support/doc/development/server/code.md
  59. +7
    -1
      tsconfig.json
  60. +355
    -17
      yarn.lock

+ 1
- 0
README.md View File

@@ -131,6 +131,7 @@ BitTorrent) inside the web browser, as of today.

* nginx
* PostgreSQL
* Redis
* **NodeJS >= 8.x**
* yarn
* OpenSSL (cli)


+ 12
- 5
client/src/app/+admin/jobs/jobs-list/jobs-list.component.html View File

@@ -1,20 +1,27 @@
<div class="admin-sub-header">
<div class="admin-sub-title">Jobs list</div>

<div class="peertube-select-container">
<select [(ngModel)]="jobState" (ngModelChange)="onJobStateChanged()">
<option *ngFor="let state of jobStates" [value]="state">{{ state }}</option>
</select>
</div>
</div>



<p-dataTable
[value]="jobs" [lazy]="true" [paginator]="true" [totalRecords]="totalRecords" [rows]="rowsPerPage"
sortField="createdAt" (onLazyLoad)="loadLazy($event)" [scrollable]="true" [virtualScroll]="true" [scrollHeight]="scrollHeight"
>
<p-column field="id" header="ID" [style]="{ width: '60px' }"></p-column>
<p-column field="category" header="Category" [style]="{ width: '130px' }"></p-column>
<p-column field="handlerName" header="Handler name" [style]="{ width: '210px' }"></p-column>
<p-column header="Input data">
<p-column field="type" header="Type" [style]="{ width: '210px' }"></p-column>
<p-column field="state" header="State" [style]="{ width: '130px' }"></p-column>
<p-column header="Payload">
<ng-template pTemplate="body" let-job="rowData">
<pre>{{ job.handlerInputData }}</pre>
<pre>{{ job.data }}</pre>
</ng-template>
</p-column>
<p-column field="state" header="State" [style]="{ width: '100px' }"></p-column>
<p-column field="createdAt" header="Created date" [sortable]="true" [style]="{ width: '250px' }"></p-column>
<p-column field="updatedAt" header="Updated date" [style]="{ width: '250px' }"></p-column>
</p-dataTable>

+ 7
- 0
client/src/app/+admin/jobs/jobs-list/jobs-list.component.scss View File

@@ -1,3 +1,10 @@
@import '_variables';
@import '_mixins';

.peertube-select-container {
@include peertube-select-container(auto);
}

pre {
font-size: 11px;
}

+ 9
- 2
client/src/app/+admin/jobs/jobs-list/jobs-list.component.ts View File

@@ -2,6 +2,7 @@ import { Component, OnInit } from '@angular/core'
import { NotificationsService } from 'angular2-notifications'
import { SortMeta } from 'primeng/primeng'
import { Job } from '../../../../../../shared/index'
import { JobState } from '../../../../../../shared/models'
import { RestPagination, RestTable } from '../../../shared'
import { viewportHeight } from '../../../shared/misc/utils'
import { JobService } from '../shared'
@@ -13,10 +14,12 @@ import { RestExtractor } from '../../../shared/rest/rest-extractor.service'
styleUrls: [ './jobs-list.component.scss' ]
})
export class JobsListComponent extends RestTable implements OnInit {
jobState: JobState = 'inactive'
jobStates: JobState[] = [ 'active', 'complete', 'failed', 'inactive', 'delayed' ]
jobs: Job[] = []
totalRecords = 0
rowsPerPage = 20
sort: SortMeta = { field: 'createdAt', order: 1 }
sort: SortMeta = { field: 'createdAt', order: -1 }
pagination: RestPagination = { count: this.rowsPerPage, start: 0 }
scrollHeight = ''

@@ -33,9 +36,13 @@ export class JobsListComponent extends RestTable implements OnInit {
this.scrollHeight = (viewportHeight() - 380) + 'px'
}

onJobStateChanged () {
this.loadData()
}

protected loadData () {
this.jobsService
.getJobs(this.pagination, this.sort)
.getJobs(this.jobState, this.pagination, this.sort)
.subscribe(
resultList => {
this.jobs = resultList.data


+ 5
- 4
client/src/app/+admin/jobs/shared/job.service.ts View File

@@ -5,6 +5,7 @@ import 'rxjs/add/operator/catch'
import 'rxjs/add/operator/map'
import { Observable } from 'rxjs/Observable'
import { ResultList } from '../../../../../../shared'
import { JobState } from '../../../../../../shared/models'
import { Job } from '../../../../../../shared/models/job.model'
import { environment } from '../../../../environments/environment'
import { RestExtractor, RestPagination, RestService } from '../../../shared'
@@ -19,19 +20,19 @@ export class JobService {
private restExtractor: RestExtractor
) {}

getJobs (pagination: RestPagination, sort: SortMeta): Observable<ResultList<Job>> {
getJobs (state: JobState, pagination: RestPagination, sort: SortMeta): Observable<ResultList<Job>> {
let params = new HttpParams()
params = this.restService.addRestGetParams(params, pagination, sort)

return this.authHttp.get<ResultList<Job>>(JobService.BASE_JOB_URL, { params })
return this.authHttp.get<ResultList<Job>>(JobService.BASE_JOB_URL + '/' + state, { params })
.map(res => this.restExtractor.convertResultListDateToHuman(res))
.map(res => this.restExtractor.applyToResultListData(res, this.prettyPrintData))
.catch(err => this.restExtractor.handleError(err))
}

private prettyPrintData (obj: Job) {
const handlerInputData = JSON.stringify(obj.handlerInputData, null, 2)
const data = JSON.stringify(obj.data, null, 2)

return Object.assign(obj, { handlerInputData })
return Object.assign(obj, { data })
}
}

+ 5
- 0
config/default.yaml View File

@@ -14,6 +14,11 @@ database:
username: 'peertube'
password: 'peertube'

redis:
hostname: 'localhost'
port: 6379
auth: null

# From the project root directory
storage:
avatars: 'storage/avatars/'


+ 5
- 0
config/production.yaml.example View File

@@ -15,6 +15,11 @@ database:
username: 'peertube'
password: 'peertube'

redis:
hostname: 'localhost'
port: 6379
auth: null

# From the project root directory
storage:
avatars: '/var/www/peertube/storage/avatars/'


+ 2
- 0
package.json View File

@@ -70,6 +70,7 @@
"js-yaml": "^3.5.4",
"jsonld": "^0.5.12",
"jsonld-signatures": "https://github.com/Chocobozzz/jsonld-signatures#rsa2017",
"kue": "^0.11.6",
"lodash": "^4.11.1",
"magnet-uri": "^5.1.4",
"mkdirp": "^0.5.1",
@@ -103,6 +104,7 @@
"@types/chai": "^4.0.4",
"@types/config": "^0.0.33",
"@types/express": "^4.0.35",
"@types/kue": "^0.11.8",
"@types/lodash": "^4.14.64",
"@types/magnet-uri": "^5.1.1",
"@types/mkdirp": "^0.5.1",


+ 1
- 0
scripts/clean/server/test.sh View File

@@ -6,4 +6,5 @@ for i in $(seq 1 6); do
rm -f "./config/local-test.json"
rm -f "./config/local-test-$i.json"
createdb "peertube_test$i"
redis-cli KEYS "q-localhost:900$i*" | grep -v empty | xargs --no-run-if-empty redis-cli DEL
done

+ 20
- 2
scripts/parse-log.ts View File

@@ -2,16 +2,34 @@ import { createReadStream } from 'fs'
import { join } from 'path'
import { createInterface } from 'readline'
import * as winston from 'winston'
import { labelFormatter, loggerFormat, timestampFormatter } from '../server/helpers/logger'
import { labelFormatter } from '../server/helpers/logger'
import { CONFIG } from '../server/initializers/constants'

const excludedKeys = {
level: true,
message: true,
splat: true,
timestamp: true,
label: true
}
function keysExcluder (key, value) {
return excludedKeys[key] === true ? undefined : value
}

const loggerFormat = winston.format.printf((info) => {
let additionalInfos = JSON.stringify(info, keysExcluder, 2)
if (additionalInfos === '{}') additionalInfos = ''
else additionalInfos = ' ' + additionalInfos

return `[${info.label}] ${new Date(info.timestamp).toISOString()} ${info.level}: ${info.message}${additionalInfos}`
})

const logger = new winston.createLogger({
transports: [
new winston.transports.Console({
level: 'debug',
stderrLevels: [],
format: winston.format.combine(
timestampFormatter,
winston.format.splat(),
labelFormatter,
winston.format.colorize(),


+ 4
- 4
server.ts View File

@@ -53,10 +53,11 @@ migrate()

// ----------- PeerTube modules -----------
import { installApplication } from './server/initializers'
import { activitypubHttpJobScheduler, transcodingJobScheduler } from './server/lib/jobs'
import { JobQueue } from './server/lib/job-queue'
import { VideosPreviewCache } from './server/lib/cache'
import { apiRouter, clientsRouter, staticRouter, servicesRouter, webfingerRouter, activityPubRouter } from './server/controllers'
import { BadActorFollowScheduler } from './server/lib/schedulers/bad-actor-follow-scheduler'
import { RemoveOldJobsScheduler } from './server/lib/schedulers/remove-old-jobs-scheduler'

// ----------- Command line -----------

@@ -170,9 +171,8 @@ function onDatabaseInitDone () {
server.listen(port, () => {
VideosPreviewCache.Instance.init(CONFIG.CACHE.PREVIEWS.SIZE)
BadActorFollowScheduler.Instance.enable()

activitypubHttpJobScheduler.activate()
transcodingJobScheduler.activate()
RemoveOldJobsScheduler.Instance.enable()
JobQueue.Instance.init()

logger.info('Server listening on port %d', port)
logger.info('Web server: %s', CONFIG.WEBSERVER.URL)


+ 32
- 6
server/controllers/api/jobs.ts View File

@@ -1,22 +1,29 @@
import * as express from 'express'
import { ResultList } from '../../../shared'
import { Job, JobType, JobState } from '../../../shared/models'
import { UserRight } from '../../../shared/models/users'
import { getFormattedObjects } from '../../helpers/utils'
import { JobQueue } from '../../lib/job-queue'
import {
asyncMiddleware, authenticate, ensureUserHasRight, jobsSortValidator, setDefaultPagination,
asyncMiddleware,
authenticate,
ensureUserHasRight,
jobsSortValidator,
setDefaultPagination,
setDefaultSort
} from '../../middlewares'
import { paginationValidator } from '../../middlewares/validators'
import { JobModel } from '../../models/job/job'
import { listJobsValidator } from '../../middlewares/validators/jobs'

const jobsRouter = express.Router()

jobsRouter.get('/',
jobsRouter.get('/:state',
authenticate,
ensureUserHasRight(UserRight.MANAGE_JOBS),
paginationValidator,
jobsSortValidator,
setDefaultSort,
setDefaultPagination,
asyncMiddleware(listJobsValidator),
asyncMiddleware(listJobs)
)

@@ -29,7 +36,26 @@ export {
// ---------------------------------------------------------------------------

async function listJobs (req: express.Request, res: express.Response, next: express.NextFunction) {
const resultList = await JobModel.listForApi(req.query.start, req.query.count, req.query.sort)
const sort = req.query.sort === 'createdAt' ? 'asc' : 'desc'

const jobs = await JobQueue.Instance.listForApi(req.params.state, req.query.start, req.query.count, sort)
const total = await JobQueue.Instance.count(req.params.state)

const result: ResultList<any> = {
total,
data: jobs.map(j => formatJob(j.toJSON()))
}
return res.json(result)
}

return res.json(getFormattedObjects(resultList.data, resultList.total))
function formatJob (job: any): Job {
return {
id: job.id,
state: job.state as JobState,
type: job.type as JobType,
data: job.data,
error: job.error,
createdAt: new Date(parseInt(job.created_at, 10)),
updatedAt: new Date(parseInt(job.updated_at, 10))
}
}

+ 1
- 1
server/controllers/api/server/follows.ts View File

@@ -123,7 +123,7 @@ function follow (fromActor: ActorModel, targetActor: ActorModel) {
actorFollow.ActorFollower = fromActor

// Send a notification to remote server
await sendFollow(actorFollow, t)
await sendFollow(actorFollow)
})
}



+ 13
- 11
server/controllers/api/videos/index.ts View File

@@ -12,7 +12,7 @@ import {
} from '../../../initializers'
import { fetchRemoteVideoDescription, getVideoActivityPubUrl, shareVideoByServerAndChannel } from '../../../lib/activitypub'
import { sendCreateVideo, sendCreateViewToOrigin, sendCreateViewToVideoFollowers, sendUpdateVideo } from '../../../lib/activitypub/send'
import { transcodingJobScheduler } from '../../../lib/jobs/transcoding-job-scheduler'
import { JobQueue } from '../../../lib/job-queue'
import {
asyncMiddleware, authenticate, paginationValidator, setDefaultSort, setDefaultPagination, videosAddValidator, videosGetValidator,
videosRemoveValidator, videosSearchValidator, videosSortValidator, videosUpdateValidator
@@ -176,18 +176,9 @@ async function addVideo (req: express.Request, res: express.Response, videoPhysi
)
await Promise.all(tasks)

return sequelizeTypescript.transaction(async t => {
const videoCreated = await sequelizeTypescript.transaction(async t => {
const sequelizeOptions = { transaction: t }

if (CONFIG.TRANSCODING.ENABLED === true) {
// Put uuid because we don't have id auto incremented for now
const dataInput = {
videoUUID: video.uuid
}

await transcodingJobScheduler.createJob(t, 'videoFileOptimizer', dataInput)
}

const videoCreated = await video.save(sequelizeOptions)
// Do not forget to add video channel information to the created video
videoCreated.VideoChannel = res.locals.videoChannel
@@ -216,6 +207,17 @@ async function addVideo (req: express.Request, res: express.Response, videoPhysi

return videoCreated
})

if (CONFIG.TRANSCODING.ENABLED === true) {
// Put uuid because we don't have id auto incremented for now
const dataInput = {
videoUUID: videoCreated.uuid
}

await JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput })
}

return videoCreated
}

async function updateVideoRetryWrapper (req: express.Request, res: express.Response, next: express.NextFunction) {


+ 14
- 0
server/helpers/custom-validators/jobs.ts View File

@@ -0,0 +1,14 @@
import { JobState } from '../../../shared/models'
import { exists } from './misc'

const jobStates: JobState[] = [ 'active', 'complete', 'failed', 'inactive', 'delayed' ]

function isValidJobState (value: JobState) {
return exists(value) && jobStates.indexOf(value) !== -1
}

// ---------------------------------------------------------------------------

export {
isValidJobState
}

+ 1
- 0
server/helpers/database-utils.ts View File

@@ -16,6 +16,7 @@ function retryTransactionWrapper <T> (
.catch(err => callback(err))
})
.catch(err => {
console.error(err)
logger.error(options.errorMessage, err)
throw err
})


+ 24
- 25
server/initializers/constants.ts View File

@@ -1,6 +1,6 @@
import { IConfig } from 'config'
import { dirname, join } from 'path'
import { JobCategory, JobState, VideoRateType } from '../../shared/models'
import { JobType, VideoRateType } from '../../shared/models'
import { ActivityPubActorType } from '../../shared/models/activitypub'
import { FollowState } from '../../shared/models/actors'
import { VideoPrivacy } from '../../shared/models/videos'
@@ -12,7 +12,7 @@ let config: IConfig = require('config')

// ---------------------------------------------------------------------------

const LAST_MIGRATION_VERSION = 175
const LAST_MIGRATION_VERSION = 180

// ---------------------------------------------------------------------------

@@ -26,7 +26,7 @@ const PAGINATION_COUNT_DEFAULT = 15
const SORTABLE_COLUMNS = {
USERS: [ 'id', 'username', 'createdAt' ],
ACCOUNTS: [ 'createdAt' ],
JOBS: [ 'id', 'createdAt' ],
JOBS: [ 'createdAt' ],
VIDEO_ABUSES: [ 'id', 'createdAt' ],
VIDEO_CHANNELS: [ 'id', 'name', 'updatedAt', 'createdAt' ],
VIDEOS: [ 'name', 'duration', 'createdAt', 'views', 'likes' ],
@@ -61,23 +61,20 @@ const REMOTE_SCHEME = {
WS: 'wss'
}

const JOB_STATES: { [ id: string ]: JobState } = {
PENDING: 'pending',
PROCESSING: 'processing',
ERROR: 'error',
SUCCESS: 'success'
}
const JOB_CATEGORIES: { [ id: string ]: JobCategory } = {
TRANSCODING: 'transcoding',
ACTIVITYPUB_HTTP: 'activitypub-http'
const JOB_ATTEMPTS: { [ id in JobType ]: number } = {
'activitypub-http-broadcast': 5,
'activitypub-http-unicast': 5,
'activitypub-http-fetcher': 5,
'video-file': 1
}
// How many maximum jobs we fetch from the database per cycle
const JOBS_FETCH_LIMIT_PER_CYCLE = {
transcoding: 10,
httpRequest: 20
const JOB_CONCURRENCY: { [ id in JobType ]: number } = {
'activitypub-http-broadcast': 1,
'activitypub-http-unicast': 5,
'activitypub-http-fetcher': 1,
'video-file': 1
}
// 1 minutes
let JOBS_FETCHING_INTERVAL = 60000
// 2 days
const JOB_COMPLETED_LIFETIME = 60000 * 60 * 24 * 2

// 1 hour
let SCHEDULER_INTERVAL = 60000 * 60
@@ -96,6 +93,11 @@ const CONFIG = {
USERNAME: config.get<string>('database.username'),
PASSWORD: config.get<string>('database.password')
},
REDIS: {
HOSTNAME: config.get<string>('redis.hostname'),
PORT: config.get<string>('redis.port'),
AUTH: config.get<string>('redis.auth')
},
STORAGE: {
AVATARS_DIR: buildPath(config.get<string>('storage.avatars')),
LOG_DIR: buildPath(config.get<string>('storage.logs')),
@@ -284,7 +286,6 @@ const ACTIVITY_PUB = {
PUBLIC: 'https://www.w3.org/ns/activitystreams#Public',
COLLECTION_ITEMS_PER_PAGE: 10,
FETCH_PAGE_LIMIT: 100,
MAX_HTTP_ATTEMPT: 5,
URL_MIME_TYPES: {
VIDEO: Object.keys(VIDEO_MIMETYPE_EXT),
TORRENT: [ 'application/x-bittorrent' ],
@@ -358,7 +359,6 @@ const OPENGRAPH_AND_OEMBED_COMMENT = '<!-- open graph and oembed tags -->'
// Special constants for a test instance
if (isTestInstance() === true) {
ACTOR_FOLLOW_SCORE.BASE = 20
JOBS_FETCHING_INTERVAL = 1000
REMOTE_SCHEME.HTTP = 'http'
REMOTE_SCHEME.WS = 'ws'
STATIC_MAX_AGE = '0'
@@ -381,10 +381,8 @@ export {
CONFIG,
CONSTRAINTS_FIELDS,
EMBED_SIZE,
JOB_STATES,
JOBS_FETCH_LIMIT_PER_CYCLE,
JOBS_FETCHING_INTERVAL,
JOB_CATEGORIES,
JOB_CONCURRENCY,
JOB_ATTEMPTS,
LAST_MIGRATION_VERSION,
OAUTH_LIFETIME,
OPENGRAPH_AND_OEMBED_COMMENT,
@@ -408,7 +406,8 @@ export {
VIDEO_RATE_TYPES,
VIDEO_MIMETYPE_EXT,
AVATAR_MIMETYPE_EXT,
SCHEDULER_INTERVAL
SCHEDULER_INTERVAL,
JOB_COMPLETED_LIFETIME
}

// ---------------------------------------------------------------------------


+ 0
- 2
server/initializers/database.ts View File

@@ -9,7 +9,6 @@ import { ActorModel } from '../models/activitypub/actor'
import { ActorFollowModel } from '../models/activitypub/actor-follow'
import { ApplicationModel } from '../models/application/application'
import { AvatarModel } from '../models/avatar/avatar'
import { JobModel } from '../models/job/job'
import { OAuthClientModel } from '../models/oauth/oauth-client'
import { OAuthTokenModel } from '../models/oauth/oauth-token'
import { ServerModel } from '../models/server/server'
@@ -61,7 +60,6 @@ async function initDatabaseModels (silent: boolean) {
ActorFollowModel,
AvatarModel,
AccountModel,
JobModel,
OAuthClientModel,
OAuthTokenModel,
ServerModel,


+ 2
- 3
server/initializers/migrations/0100-activitypub.ts View File

@@ -1,11 +1,10 @@
import { values } from 'lodash'
import * as Sequelize from 'sequelize'
import { createPrivateAndPublicKeys } from '../../helpers/peertube-crypto'
import { shareVideoByServerAndChannel } from '../../lib/activitypub/share'
import { getVideoActivityPubUrl, getVideoChannelActivityPubUrl } from '../../lib/activitypub/url'
import { createLocalAccountWithoutKeys } from '../../lib/user'
import { ApplicationModel } from '../../models/application/application'
import { JOB_CATEGORIES, SERVER_ACTOR_NAME } from '../constants'
import { SERVER_ACTOR_NAME } from '../constants'

async function up (utils: {
transaction: Sequelize.Transaction,
@@ -161,7 +160,7 @@ async function up (utils: {

{
const data = {
type: Sequelize.ENUM(values(JOB_CATEGORIES)),
type: Sequelize.ENUM('transcoding', 'activitypub-http'),
defaultValue: 'transcoding',
allowNull: false
}


+ 18
- 0
server/initializers/migrations/0180-job-table-delete.ts View File

@@ -0,0 +1,18 @@
import * as Sequelize from 'sequelize'

async function up (utils: {
transaction: Sequelize.Transaction,
queryInterface: Sequelize.QueryInterface,
sequelize: Sequelize.Sequelize
}): Promise<void> {
await utils.queryInterface.dropTable('job')
}

function down (options) {
throw new Error('Not implemented.')
}

export {
up,
down
}

+ 34
- 25
server/lib/activitypub/actor.ts View File

@@ -64,7 +64,11 @@ async function getOrCreateActorAndServerAndModel (actorUrl: string, recurseIfNee
actor = await retryTransactionWrapper(saveActorAndServerAndModelIfNotExist, options)
}

return refreshActorIfNeeded(actor)
const options = {
arguments: [ actor ],
errorMessage: 'Cannot refresh actor if needed with many retries.'
}
return retryTransactionWrapper(refreshActorIfNeeded, options)
}

function buildActorInstance (type: ActivityPubActorType, url: string, preferredUsername: string, uuid?: string) {
@@ -325,38 +329,43 @@ async function saveVideoChannel (actor: ActorModel, result: FetchRemoteActorResu
async function refreshActorIfNeeded (actor: ActorModel) {
if (!actor.isOutdated()) return actor

const actorUrl = await getUrlFromWebfinger(actor.preferredUsername, actor.getHost())
const result = await fetchRemoteActor(actorUrl)
if (result === undefined) {
logger.warn('Cannot fetch remote actor in refresh actor.')
return actor
}

return sequelizeTypescript.transaction(async t => {
updateInstanceWithAnother(actor, result.actor)

if (result.avatarName !== undefined) {
await updateActorAvatarInstance(actor, result.avatarName, t)
try {
const actorUrl = await getUrlFromWebfinger(actor.preferredUsername, actor.getHost())
const result = await fetchRemoteActor(actorUrl)
if (result === undefined) {
logger.warn('Cannot fetch remote actor in refresh actor.')
return actor
}

// Force update
actor.setDataValue('updatedAt', new Date())
await actor.save({ transaction: t })
return sequelizeTypescript.transaction(async t => {
updateInstanceWithAnother(actor, result.actor)

if (actor.Account) {
await actor.save({ transaction: t })
if (result.avatarName !== undefined) {
await updateActorAvatarInstance(actor, result.avatarName, t)
}

actor.Account.set('name', result.name)
await actor.Account.save({ transaction: t })
} else if (actor.VideoChannel) {
// Force update
actor.setDataValue('updatedAt', new Date())
await actor.save({ transaction: t })

actor.VideoChannel.set('name', result.name)
await actor.VideoChannel.save({ transaction: t })
}
if (actor.Account) {
await actor.save({ transaction: t })

actor.Account.set('name', result.name)
await actor.Account.save({ transaction: t })
} else if (actor.VideoChannel) {
await actor.save({ transaction: t })

actor.VideoChannel.set('name', result.name)
await actor.VideoChannel.save({ transaction: t })
}

return actor
})
} catch (err) {
logger.warn('Cannot refresh actor.', err)
return actor
})
}
}

function normalizeActor (actor: any) {


+ 4
- 5
server/lib/activitypub/fetch.ts View File

@@ -1,13 +1,12 @@
import { Transaction } from 'sequelize'
import { ActorModel } from '../../models/activitypub/actor'
import { activitypubHttpJobScheduler, ActivityPubHttpPayload } from '../jobs/activitypub-http-job-scheduler'
import { JobQueue } from '../job-queue'

async function addFetchOutboxJob (actor: ActorModel, t: Transaction) {
const jobPayload: ActivityPubHttpPayload = {
async function addFetchOutboxJob (actor: ActorModel) {
const payload = {
uris: [ actor.outboxUrl ]
}

return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpFetcherHandler', jobPayload)
return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
}

export {


+ 1
- 1
server/lib/activitypub/process/process-accept.ts View File

@@ -26,6 +26,6 @@ async function processAccept (actor: ActorModel, targetActor: ActorModel) {
if (follow.state !== 'accepted') {
follow.set('state', 'accepted')
await follow.save()
await addFetchOutboxJob(targetActor, undefined)
await addFetchOutboxJob(targetActor)
}
}

+ 1
- 1
server/lib/activitypub/process/process-follow.ts View File

@@ -63,7 +63,7 @@ async function follow (actor: ActorModel, targetActorURL: string) {
actorFollow.ActorFollowing = targetActor

// Target sends to actor he accepted the follow request
return sendAccept(actorFollow, t)
return sendAccept(actorFollow)
})

logger.info('Actor %s is followed by actor %s.', targetActorURL, actor.url)


+ 12
- 14
server/lib/activitypub/send/misc.ts View File

@@ -7,7 +7,7 @@ import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
import { VideoModel } from '../../../models/video/video'
import { VideoCommentModel } from '../../../models/video/video-comment'
import { VideoShareModel } from '../../../models/video/video-share'
import { activitypubHttpJobScheduler, ActivityPubHttpPayload } from '../../jobs/activitypub-http-job-scheduler'
import { JobQueue } from '../../job-queue'

async function forwardActivity (
activity: Activity,
@@ -35,12 +35,11 @@ async function forwardActivity (

logger.debug('Creating forwarding job.', { uris })

const jobPayload: ActivityPubHttpPayload = {
const payload = {
uris,
body: activity
}

return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpBroadcastHandler', jobPayload)
return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
}

async function broadcastToFollowers (
@@ -51,44 +50,43 @@ async function broadcastToFollowers (
actorsException: ActorModel[] = []
) {
const uris = await computeFollowerUris(toActorFollowers, actorsException, t)
return broadcastTo(uris, data, byActor, t)
return broadcastTo(uris, data, byActor)
}

async function broadcastToActors (
data: any,
byActor: ActorModel,
toActors: ActorModel[],
t: Transaction,
actorsException: ActorModel[] = []
) {
const uris = await computeUris(toActors, actorsException)
return broadcastTo(uris, data, byActor, t)
return broadcastTo(uris, data, byActor)
}

async function broadcastTo (uris: string[], data: any, byActor: ActorModel, t: Transaction) {
async function broadcastTo (uris: string[], data: any, byActor: ActorModel) {
if (uris.length === 0) return undefined

logger.debug('Creating broadcast job.', { uris })

const jobPayload: ActivityPubHttpPayload = {
const payload = {
uris,
signatureActorId: byActor.id,
body: data
}

return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpBroadcastHandler', jobPayload)
return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
}

async function unicastTo (data: any, byActor: ActorModel, toActorUrl: string, t: Transaction) {
async function unicastTo (data: any, byActor: ActorModel, toActorUrl: string) {
logger.debug('Creating unicast job.', { uri: toActorUrl })

const jobPayload: ActivityPubHttpPayload = {
uris: [ toActorUrl ],
const payload = {
uri: toActorUrl,
signatureActorId: byActor.id,
body: data
}

return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpUnicastHandler', jobPayload)
return JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
}

function getOriginVideoAudience (video: VideoModel, actorsInvolvedInVideo: ActorModel[]) {


+ 2
- 3
server/lib/activitypub/send/send-accept.ts View File

@@ -1,4 +1,3 @@
import { Transaction } from 'sequelize'
import { ActivityAccept, ActivityFollow } from '../../../../shared/models/activitypub'
import { ActorModel } from '../../../models/activitypub/actor'
import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
@@ -6,7 +5,7 @@ import { getActorFollowAcceptActivityPubUrl, getActorFollowActivityPubUrl } from
import { unicastTo } from './misc'
import { followActivityData } from './send-follow'

async function sendAccept (actorFollow: ActorFollowModel, t: Transaction) {
async function sendAccept (actorFollow: ActorFollowModel) {
const follower = actorFollow.ActorFollower
const me = actorFollow.ActorFollowing

@@ -16,7 +15,7 @@ async function sendAccept (actorFollow: ActorFollowModel, t: Transaction) {
const url = getActorFollowAcceptActivityPubUrl(actorFollow)
const data = acceptActivityData(url, me, followData)

return unicastTo(data, me, follower.inboxUrl, t)
return unicastTo(data, me, follower.inboxUrl)
}

// ---------------------------------------------------------------------------


+ 1
- 1
server/lib/activitypub/send/send-announce.ts View File

@@ -42,7 +42,7 @@ async function sendVideoAnnounceToOrigin (byActor: ActorModel, video: VideoModel
const audience = getOriginVideoAudience(video, actorsInvolvedInVideo)
const data = await createActivityData(url, byActor, announcedActivity, t, audience)

return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t)
return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl)
}

async function announceActivityData (


+ 14
- 8
server/lib/activitypub/send/send-create.ts View File

@@ -8,8 +8,14 @@ import { VideoAbuseModel } from '../../../models/video/video-abuse'
import { VideoCommentModel } from '../../../models/video/video-comment'
import { getVideoAbuseActivityPubUrl, getVideoDislikeActivityPubUrl, getVideoViewActivityPubUrl } from '../url'
import {
audiencify, broadcastToActors, broadcastToFollowers, getActorsInvolvedInVideo, getAudience, getObjectFollowersAudience,
getOriginVideoAudience, getOriginVideoCommentAudience,
audiencify,
broadcastToActors,
broadcastToFollowers,
getActorsInvolvedInVideo,
getAudience,
getObjectFollowersAudience,
getOriginVideoAudience,
getOriginVideoCommentAudience,
unicastTo
} from './misc'

@@ -31,7 +37,7 @@ async function sendVideoAbuse (byActor: ActorModel, videoAbuse: VideoAbuseModel,
const audience = { to: [ video.VideoChannel.Account.Actor.url ], cc: [] }
const data = await createActivityData(url, byActor, videoAbuse.toActivityPubObject(), t, audience)

return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t)
return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl)
}

async function sendCreateVideoCommentToOrigin (comment: VideoCommentModel, t: Transaction) {
@@ -47,13 +53,13 @@ async function sendCreateVideoCommentToOrigin (comment: VideoCommentModel, t: Tr

// This was a reply, send it to the parent actors
const actorsException = [ byActor ]
await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), t, actorsException)
await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), actorsException)

// Broadcast to our followers
await broadcastToFollowers(data, byActor, [ byActor ], t)

// Send to origin
return unicastTo(data, byActor, comment.Video.VideoChannel.Account.Actor.sharedInboxUrl, t)
return unicastTo(data, byActor, comment.Video.VideoChannel.Account.Actor.sharedInboxUrl)
}

async function sendCreateVideoCommentToVideoFollowers (comment: VideoCommentModel, t: Transaction) {
@@ -69,7 +75,7 @@ async function sendCreateVideoCommentToVideoFollowers (comment: VideoCommentMode

// This was a reply, send it to the parent actors
const actorsException = [ byActor ]
await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), t, actorsException)
await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), actorsException)

// Broadcast to our followers
await broadcastToFollowers(data, byActor, [ byActor ], t)
@@ -86,7 +92,7 @@ async function sendCreateViewToOrigin (byActor: ActorModel, video: VideoModel, t
const audience = getOriginVideoAudience(video, actorsInvolvedInVideo)
const data = await createActivityData(url, byActor, viewActivityData, t, audience)

return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t)
return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl)
}

async function sendCreateViewToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) {
@@ -111,7 +117,7 @@ async function sendCreateDislikeToOrigin (byActor: ActorModel, video: VideoModel
const audience = getOriginVideoAudience(video, actorsInvolvedInVideo)
const data = await createActivityData(url, byActor, dislikeActivityData, t, audience)

return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t)
return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl)
}

async function sendCreateDislikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) {


+ 2
- 3
server/lib/activitypub/send/send-follow.ts View File

@@ -1,18 +1,17 @@
import { Transaction } from 'sequelize'
import { ActivityFollow } from '../../../../shared/models/activitypub'
import { ActorModel } from '../../../models/activitypub/actor'
import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
import { getActorFollowActivityPubUrl } from '../url'
import { unicastTo } from './misc'

function sendFollow (actorFollow: ActorFollowModel, t: Transaction) {
function sendFollow (actorFollow: ActorFollowModel) {
const me = actorFollow.ActorFollower
const following = actorFollow.ActorFollowing

const url = getActorFollowActivityPubUrl(actorFollow)
const data = followActivityData(url, me, following)

return unicastTo(data, me, following.inboxUrl, t)
return unicastTo(data, me, following.inboxUrl)
}

function followActivityData (url: string, byActor: ActorModel, targetActor: ActorModel): ActivityFollow {


+ 1
- 1
server/lib/activitypub/send/send-like.ts View File

@@ -20,7 +20,7 @@ async function sendLikeToOrigin (byActor: ActorModel, video: VideoModel, t: Tran
const audience = getOriginVideoAudience(video, accountsInvolvedInVideo)
const data = await likeActivityData(url, byActor, video, t, audience)

return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t)
return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl)
}

async function sendLikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) {


+ 4
- 10
server/lib/activitypub/send/send-undo.ts View File

@@ -1,11 +1,5 @@
import { Transaction } from 'sequelize'
import {
ActivityAudience,
ActivityCreate,
ActivityFollow,
ActivityLike,
ActivityUndo
} from '../../../../shared/models/activitypub'
import { ActivityAudience, ActivityCreate, ActivityFollow, ActivityLike, ActivityUndo } from '../../../../shared/models/activitypub'
import { ActorModel } from '../../../models/activitypub/actor'
import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
import { VideoModel } from '../../../models/video/video'
@@ -33,7 +27,7 @@ async function sendUndoFollow (actorFollow: ActorFollowModel, t: Transaction) {
const object = followActivityData(followUrl, me, following)
const data = await undoActivityData(undoUrl, me, object, t)

return unicastTo(data, me, following.inboxUrl, t)
return unicastTo(data, me, following.inboxUrl)
}

async function sendUndoLikeToOrigin (byActor: ActorModel, video: VideoModel, t: Transaction) {
@@ -45,7 +39,7 @@ async function sendUndoLikeToOrigin (byActor: ActorModel, video: VideoModel, t:
const object = await likeActivityData(likeUrl, byActor, video, t)
const data = await undoActivityData(undoUrl, byActor, object, t, audience)

return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t)
return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl)
}

async function sendUndoLikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) {
@@ -72,7 +66,7 @@ async function sendUndoDislikeToOrigin (byActor: ActorModel, video: VideoModel,

const data = await undoActivityData(undoUrl, byActor, object, t, audience)

return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl, t)
return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl)
}

async function sendUndoDislikeToVideoFollowers (byActor: ActorModel, video: VideoModel, t: Transaction) {


server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts → server/lib/job-queue/handlers/activitypub-http-broadcast.ts View File

@@ -1,10 +1,19 @@
import * as kue from 'kue'
import { logger } from '../../../helpers/logger'
import { doRequest } from '../../../helpers/requests'
import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
import { ActivityPubHttpPayload, buildSignedRequestOptions, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler'
import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'

async function process (payload: ActivityPubHttpPayload, jobId: number) {
logger.info('Processing ActivityPub broadcast in job %d.', jobId)
export type ActivitypubHttpBroadcastPayload = {
uris: string[]
signatureActorId?: number
body: any
}

async function processActivityPubHttpBroadcast (job: kue.Job) {
logger.info('Processing ActivityPub broadcast in job %d.', job.id)

const payload = job.data as ActivitypubHttpBroadcastPayload

const body = await computeBody(payload)
const httpSignatureOptions = await buildSignedRequestOptions(payload)
@@ -26,28 +35,15 @@ async function process (payload: ActivityPubHttpPayload, jobId: number) {
await doRequest(options)
goodUrls.push(uri)
} catch (err) {
const isRetryingLater = await maybeRetryRequestLater(err, payload, uri)
if (isRetryingLater === false) badUrls.push(uri)
badUrls.push(uri)
}
}

return ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes(goodUrls, badUrls, undefined)
}

function onError (err: Error, jobId: number) {
logger.error('Error when broadcasting ActivityPub request in job %d.', jobId, err)
return Promise.resolve()
}

function onSuccess (jobId: number) {
logger.info('Job %d is a success.', jobId)
return Promise.resolve()
}

// ---------------------------------------------------------------------------

export {
process,
onError,
onSuccess
processActivityPubHttpBroadcast
}

server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts → server/lib/job-queue/handlers/activitypub-http-fetcher.ts View File

@@ -1,11 +1,18 @@
import * as kue from 'kue'
import { logger } from '../../../helpers/logger'
import { doRequest } from '../../../helpers/requests'
import { ACTIVITY_PUB } from '../../../initializers'
import { processActivities } from '../../activitypub/process'
import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler'
import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast'

async function process (payload: ActivityPubHttpPayload, jobId: number) {
logger.info('Processing ActivityPub fetcher in job %d.', jobId)
export type ActivitypubHttpFetcherPayload = {
uris: string[]
}

async function processActivityPubHttpFetcher (job: kue.Job) {
logger.info('Processing ActivityPub fetcher in job %d.', job.id)

const payload = job.data as ActivitypubHttpBroadcastPayload

const options = {
method: 'GET',
@@ -49,20 +56,8 @@ async function process (payload: ActivityPubHttpPayload, jobId: number) {
}
}

function onError (err: Error, jobId: number) {
logger.error('Error when fetcher ActivityPub request in job %d.', jobId, err)
return Promise.resolve()
}

function onSuccess (jobId: number) {
logger.info('Job %d is a success.', jobId)
return Promise.resolve()
}

// ---------------------------------------------------------------------------

export {
process,
onError,
onSuccess
processActivityPubHttpFetcher
}

+ 43
- 0
server/lib/job-queue/handlers/activitypub-http-unicast.ts View File

@@ -0,0 +1,43 @@
import * as kue from 'kue'
import { logger } from '../../../helpers/logger'
import { doRequest } from '../../../helpers/requests'
import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'

export type ActivitypubHttpUnicastPayload = {
uri: string
signatureActorId?: number
body: any
}

async function processActivityPubHttpUnicast (job: kue.Job) {
logger.info('Processing ActivityPub unicast in job %d.', job.id)

const payload = job.data as ActivitypubHttpUnicastPayload
const uri = payload.uri

const body = await computeBody(payload)
const httpSignatureOptions = await buildSignedRequestOptions(payload)

const options = {
method: 'POST',
uri,
json: body,
httpSignature: httpSignatureOptions
}

try {
await doRequest(options)
ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([ uri ], [], undefined)
} catch (err) {
ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([], [ uri ], undefined)

throw err
}
}

// ---------------------------------------------------------------------------

export {
processActivityPubHttpUnicast
}

+ 39
- 0
server/lib/job-queue/handlers/utils/activitypub-http-utils.ts View File

@@ -0,0 +1,39 @@
import { buildSignedActivity } from '../../../../helpers/activitypub'
import { getServerActor } from '../../../../helpers/utils'
import { ActorModel } from '../../../../models/activitypub/actor'

async function computeBody (payload: { body: any, signatureActorId?: number }) {
let body = payload.body

if (payload.signatureActorId) {
const actorSignature = await ActorModel.load(payload.signatureActorId)
if (!actorSignature) throw new Error('Unknown signature actor id.')
body = await buildSignedActivity(actorSignature, payload.body)
}

return body
}

async function buildSignedRequestOptions (payload: { signatureActorId?: number }) {
let actor: ActorModel
if (payload.signatureActorId) {
actor = await ActorModel.load(payload.signatureActorId)
if (!actor) throw new Error('Unknown signature actor id.')
} else {
// We need to sign the request, so use the server
actor = await getServerActor()
}

const keyId = actor.getWebfingerUrl()
return {
algorithm: 'rsa-sha256',
authorizationHeaderName: 'Signature',
keyId,
key: actor.privateKey
}
}

export {
computeBody,
buildSignedRequestOptions
}

server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts → server/lib/job-queue/handlers/video-file.ts View File

@@ -1,38 +1,60 @@
import * as Bluebird from 'bluebird'
import * as kue from 'kue'
import { VideoResolution } from '../../../../shared'
import { VideoPrivacy } from '../../../../shared/models/videos'
import { logger } from '../../../helpers/logger'
import { computeResolutionsToTranscode } from '../../../helpers/utils'
import { sequelizeTypescript } from '../../../initializers'
import { JobModel } from '../../../models/job/job'
import { VideoModel } from '../../../models/video/video'
import { shareVideoByServerAndChannel } from '../../activitypub'
import { sendCreateVideo } from '../../activitypub/send'
import { JobScheduler } from '../job-scheduler'
import { TranscodingJobPayload } from './transcoding-job-scheduler'
import { sendCreateVideo, sendUpdateVideo } from '../../activitypub/send'
import { JobQueue } from '../job-queue'

async function process (data: TranscodingJobPayload, jobId: number) {
const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID)
export type VideoFilePayload = {
videoUUID: string
resolution?: VideoResolution
}

async function processVideoFile (job: kue.Job) {
const payload = job.data as VideoFilePayload
logger.info('Processing video file in job %d.', job.id)

const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(payload.videoUUID)
// No video, maybe deleted?
if (!video) {
logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid })
logger.info('Do not process job %d, video does not exist.', job.id, { videoUUID: video.uuid })
return undefined
}

await video.optimizeOriginalVideofile()
// Transcoding in other resolution
if (payload.resolution) {
await video.transcodeOriginalVideofile(payload.resolution)
await onVideoFileTranscoderSuccess(video)
} else {
await video.optimizeOriginalVideofile()
await onVideoFileOptimizerSuccess(video)
}

return video
}

function onError (err: Error, jobId: number) {
logger.error('Error when optimized video file in job %d.', jobId, err)
return Promise.resolve()
async function onVideoFileTranscoderSuccess (video: VideoModel) {
if (video === undefined) return undefined

// Maybe the video changed in database, refresh it
const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid)
// Video does not exist anymore
if (!videoDatabase) return undefined

if (video.privacy !== VideoPrivacy.PRIVATE) {
await sendUpdateVideo(video, undefined)
}

return undefined
}

async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobScheduler<TranscodingJobPayload, VideoModel>) {
async function onVideoFileOptimizerSuccess (video: VideoModel) {
if (video === undefined) return undefined

logger.info('Job %d is a success.', jobId)

// Maybe the video changed in database, refresh it
const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid)
// Video does not exist anymore
@@ -56,7 +78,7 @@ async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobSch
if (resolutionsEnabled.length !== 0) {
try {
await sequelizeTypescript.transaction(async t => {
const tasks: Bluebird<JobModel>[] = []
const tasks: Promise<any>[] = []

for (const resolution of resolutionsEnabled) {
const dataInput = {
@@ -64,7 +86,7 @@ async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobSch
resolution
}

const p = jobScheduler.createJob(t, 'videoFileTranscoder', dataInput)
const p = JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput })
tasks.push(p)
}

@@ -84,7 +106,5 @@ async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobSch
// ---------------------------------------------------------------------------

export {
process,
onError,
onSuccess
processVideoFile
}

+ 1
- 0
server/lib/job-queue/index.ts View File

@@ -0,0 +1 @@
export * from './job-queue'

+ 124
- 0
server/lib/job-queue/job-queue.ts View File

@@ -0,0 +1,124 @@
import * as kue from 'kue'
import { JobType, JobState } from '../../../shared/models'
import { logger } from '../../helpers/logger'
import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers'
import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
import { processVideoFile, VideoFilePayload } from './handlers/video-file'

type CreateJobArgument =
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
{ type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
{ type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
{ type: 'video-file', payload: VideoFilePayload }

const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = {
'activitypub-http-broadcast': processActivityPubHttpBroadcast,
'activitypub-http-unicast': processActivityPubHttpUnicast,
'activitypub-http-fetcher': processActivityPubHttpFetcher,
'video-file': processVideoFile
}

class JobQueue {

private static instance: JobQueue

private jobQueue: kue.Queue
private initialized = false

private constructor () {}

init () {
// Already initialized
if (this.initialized === true) return
this.initialized = true

this.jobQueue = kue.createQueue({
prefix: 'q-' + CONFIG.WEBSERVER.HOST,
redis: {
host: CONFIG.REDIS.HOSTNAME,
port: CONFIG.REDIS.PORT,
auth: CONFIG.REDIS.AUTH
}
})

this.jobQueue.on('error', err => {
logger.error('Error in job queue.', err)
process.exit(-1)
})
this.jobQueue.watchStuckJobs(5000)

for (const handlerName of Object.keys(handlers)) {
this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => {
try {
const res = await handlers[ handlerName ](job)
return done(null, res)
} catch (err) {
return done(err)
}
})
}
}

createJob (obj: CreateJobArgument, priority = 'normal') {
return new Promise((res, rej) => {
this.jobQueue
.create(obj.type, obj.payload)
.priority(priority)
.attempts(JOB_ATTEMPTS[obj.type])
.backoff({ type: 'exponential' })
.save(err => {
if (err) return rej(err)

return res()
})
})
}

listForApi (state: JobState, start: number, count: number, sort: string) {
return new Promise<kue.Job[]>((res, rej) => {
kue.Job.rangeByState(state, start, count, sort, (err, jobs) => {
if (err) return rej(err)

return res(jobs)
})
})
}

count (state: JobState) {
return new Promise<number>((res, rej) => {
this.jobQueue[state + 'Count']((err, total) => {
if (err) return rej(err)

return res(total)
})
})
}

removeOldJobs () {
const now = new Date().getTime()
kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => {
if (err) {
logger.error('Cannot get jobs when removing old jobs.', err)
return
}

for (const job of jobs) {
if (now - job.created_at > JOB_COMPLETED_LIFETIME) {
job.remove()
}
}
})
}

static get Instance () {
return this.instance || (this.instance = new this())
}
}

// ---------------------------------------------------------------------------

export {
JobQueue
}

+ 0
- 94
server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts View File

@@ -1,94 +0,0 @@
import { JobCategory } from '../../../../shared'
import { buildSignedActivity } from '../../../helpers/activitypub'
import { logger } from '../../../helpers/logger'
import { getServerActor } from '../../../helpers/utils'
import { ACTIVITY_PUB } from '../../../initializers'
import { ActorModel } from '../../../models/activitypub/actor'
import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
import { JobHandler, JobScheduler } from '../job-scheduler'

import * as activitypubHttpBroadcastHandler from './activitypub-http-broadcast-handler'
import * as activitypubHttpFetcherHandler from './activitypub-http-fetcher-handler'
import * as activitypubHttpUnicastHandler from './activitypub-http-unicast-handler'

type ActivityPubHttpPayload = {
uris: string[]
signatureActorId?: number
body?: any
attemptNumber?: number
}

const jobHandlers: { [ handlerName: string ]: JobHandler<ActivityPubHttpPayload, void> } = {
activitypubHttpBroadcastHandler,
activitypubHttpUnicastHandler,
activitypubHttpFetcherHandler
}
const jobCategory: JobCategory = 'activitypub-http'

const activitypubHttpJobScheduler = new JobScheduler(jobCategory, jobHandlers)

async function maybeRetryRequestLater (err: Error, payload: ActivityPubHttpPayload, uri: string) {
logger.warn('Cannot make request to %s.', uri, err)

let attemptNumber = payload.attemptNumber || 1
attemptNumber += 1

if (attemptNumber < ACTIVITY_PUB.MAX_HTTP_ATTEMPT) {
logger.debug('Retrying request to %s (attempt %d/%d).', uri, attemptNumber, ACTIVITY_PUB.MAX_HTTP_ATTEMPT, err)

const actor = await ActorFollowModel.loadByFollowerInbox(uri, undefined)
if (!actor) {
logger.debug('Actor %s is not a follower, do not retry the request.', uri)
return false
}

const newPayload = Object.assign(payload, {
uris: [ uri ],
attemptNumber
})
await activitypubHttpJobScheduler.createJob(undefined, 'activitypubHttpUnicastHandler', newPayload)

return true
}

return false
}

async function computeBody (payload: ActivityPubHttpPayload) {
let body = payload.body

if (payload.signatureActorId) {
const actorSignature = await ActorModel.load(payload.signatureActorId)
if (!actorSignature) throw new Error('Unknown signature actor id.')
body = await buildSignedActivity(actorSignature, payload.body)
}

return body
}

async function buildSignedRequestOptions (payload: ActivityPubHttpPayload) {
let actor: ActorModel
if (payload.signatureActorId) {
actor = await ActorModel.load(payload.signatureActorId)
if (!actor) throw new Error('Unknown signature actor id.')
} else {
// We need to sign the request, so use the server
actor = await getServerActor()
}

const keyId = actor.getWebfingerUrl()
return {
algorithm: 'rsa-sha256',
authorizationHeaderName: 'Signature',
keyId,
key: actor.privateKey
}
}

export {
ActivityPubHttpPayload,
activitypubHttpJobScheduler,
maybeRetryRequestLater,
computeBody,
buildSignedRequestOptions
}

+ 0
- 50
server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts View File

@@ -1,50 +0,0 @@
import { logger } from '../../../helpers/logger'
import { doRequest } from '../../../helpers/requests'
import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
import { ActivityPubHttpPayload, buildSignedRequestOptions, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler'

async function process (payload: ActivityPubHttpPayload, jobId: number) {
logger.info('Processing ActivityPub unicast in job %d.', jobId)

const uri = payload.uris[0]

const body = await computeBody(payload)
const httpSignatureOptions = await buildSignedRequestOptions(payload)

const options = {
method: 'POST',
uri,
json: body,
httpSignature: httpSignatureOptions
}

try {
await doRequest(options)
ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([ uri ], [], undefined)
} catch (err) {
const isRetryingLater = await maybeRetryRequestLater(err, payload, uri)
if (isRetryingLater === false) {
ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([], [ uri ], undefined)
}

throw err
}
}

function onError (err: Error, jobId: number) {
logger.error('Error when sending ActivityPub request in job %d.', jobId, err)
return Promise.resolve()
}

function onSuccess (jobId: number) {
logger.info('Job %d is a success.', jobId)
return Promise.resolve()
}

// ---------------------------------------------------------------------------

export {
process,
onError,
onSuccess
}

+ 0
- 1
server/lib/jobs/activitypub-http-job-scheduler/index.ts View File

@@ -1 +0,0 @@
export * from './activitypub-http-job-scheduler'

+ 0
- 2
server/lib/jobs/index.ts View File

@@ -1,2 +0,0 @@
export * from './activitypub-http-job-scheduler'
export * from './transcoding-job-scheduler'

+ 0
- 144
server/lib/jobs/job-scheduler.ts View File

@@ -1,144 +0,0 @@
import { AsyncQueue, forever, queue } from 'async'
import * as Sequelize from 'sequelize'
import { JobCategory } from '../../../shared'
import { logger } from '../../helpers/logger'
import { JOB_STATES, JOBS_FETCH_LIMIT_PER_CYCLE, JOBS_FETCHING_INTERVAL } from '../../initializers'
import { JobModel } from '../../models/job/job'

export interface JobHandler<P, T> {
process (data: object, jobId: number): Promise<T>
onError (err: Error, jobId: number)
onSuccess (jobId: number, jobResult: T, jobScheduler: JobScheduler<P, T>): Promise<any>
}
type JobQueueCallback = (err: Error) => void

class JobScheduler<P, T> {

constructor (
private jobCategory: JobCategory,
private jobHandlers: { [ id: string ]: JobHandler<P, T> }
) {}

async activate () {
const limit = JOBS_FETCH_LIMIT_PER_CYCLE[this.jobCategory]

logger.info('Jobs scheduler %s activated.', this.jobCategory)

const jobsQueue = queue<JobModel, JobQueueCallback>(this.processJob.bind(this))

// Finish processing jobs from a previous start
const state = JOB_STATES.PROCESSING
try {
const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory)

this.enqueueJobs(jobsQueue, jobs)
} catch (err) {
logger.error('Cannot list pending jobs.', err)
}

forever(
async next => {
if (jobsQueue.length() !== 0) {
// Finish processing the queue first
return setTimeout(next, JOBS_FETCHING_INTERVAL)
}

const state = JOB_STATES.PENDING
try {
const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory)

this.enqueueJobs(jobsQueue, jobs)
} catch (err) {
logger.error('Cannot list pending jobs.', err)
}

// Optimization: we could use "drain" from queue object
return setTimeout(next, JOBS_FETCHING_INTERVAL)
},

err => logger.error('Error in job scheduler queue.', err)
)
}

createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: P) {
const createQuery = {
state: JOB_STATES.PENDING,
category: this.jobCategory,
handlerName,
handlerInputData
}

const options = { transaction }

return JobModel.create(createQuery, options)
}

private enqueueJobs (jobsQueue: AsyncQueue<JobModel>, jobs: JobModel[]) {
jobs.forEach(job => jobsQueue.push(job))
}

private async processJob (job: JobModel, callback: (err: Error) => void) {
const jobHandler = this.jobHandlers[job.handlerName]
if (jobHandler === undefined) {
const errorString = 'Unknown job handler ' + job.handlerName + ' for job ' + job.id
logger.error(errorString)

const error = new Error(errorString)
await this.onJobError(jobHandler, job, error)
return callback(error)
}

logger.info('Processing job %d with handler %s.', job.id, job.handlerName)

job.state = JOB_STATES.PROCESSING
await job.save()

try {
const result: T = await jobHandler.process(job.handlerInputData, job.id)
await this.onJobSuccess(jobHandler, job, result)
} catch (err) {
logger.error('Error in job handler %s.', job.handlerName, err)

try {
await this.onJobError(jobHandler, job, err)
} catch (innerErr) {
this.cannotSaveJobError(innerErr)
return callback(innerErr)
}
}

return callback(null)
}

private async onJobError (jobHandler: JobHandler<P, T>, job: JobModel, err: Error) {
job.state = JOB_STATES.ERROR

try {
await job.save()
if (jobHandler) await jobHandler.onError(err, job.id)
} catch (err) {
this.cannotSaveJobError(err)
}
}

private async onJobSuccess (jobHandler: JobHandler<P, T>, job: JobModel, jobResult: T) {
job.state = JOB_STATES.SUCCESS

try {
await job.save()
await jobHandler.onSuccess(job.id, jobResult, this)
} catch (err) {
this.cannotSaveJobError(err)
}
}

private cannotSaveJobError (err: Error) {
logger.error('Cannot save new job state.', err)
}
}

// ---------------------------------------------------------------------------

export {
JobScheduler
}

+ 0
- 1
server/lib/jobs/transcoding-job-scheduler/index.ts View File

@@ -1 +0,0 @@
export * from './transcoding-job-scheduler'

+ 0
- 23
server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts View File

@@ -1,23 +0,0 @@
import { JobCategory } from '../../../../shared'
import { VideoModel } from '../../../models/video/video'
import { JobHandler, JobScheduler } from '../job-scheduler'

import * as videoFileOptimizer from './video-file-optimizer-handler'
import * as videoFileTranscoder from './video-file-transcoder-handler'

type TranscodingJobPayload = {
videoUUID: string
resolution?: number
}
const jobHandlers: { [ handlerName: string ]: JobHandler<TranscodingJobPayload, VideoModel> } = {
videoFileOptimizer,
videoFileTranscoder
}
const jobCategory: JobCategory = 'transcoding'

const transcodingJobScheduler = new JobScheduler(jobCategory, jobHandlers)

export {
TranscodingJobPayload,
transcodingJobScheduler
}

+ 0
- 48
server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts View File

@@ -1,48 +0,0 @@
import { VideoResolution } from '../../../../shared'
import { VideoPrivacy } from '../../../../shared/models/videos'
import { logger } from '../../../helpers/logger'
import { VideoModel } from '../../../models/video/video'
import { sendUpdateVideo } from '../../activitypub/send'

async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) {
const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID)
// No video, maybe deleted?
if (!video) {
logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid })
return undefined
}

await video.transcodeOriginalVideofile(data.resolution)

return video
}

function onError (err: Error, jobId: number) {
logger.error('Error when transcoding video file in job %d.', jobId, err)
return Promise.resolve()
}

async function onSuccess (jobId: number, video: VideoModel) {
if (video === undefined) return undefined

logger.info('Job %d is a success.', jobId)

// Maybe the video changed in database, refresh it
const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid)
// Video does not exist anymore
if (!videoDatabase) return undefined

if (video.privacy !== VideoPrivacy.PRIVATE) {
await sendUpdateVideo(video, undefined)
}

return undefined
}

// ---------------------------------------------------------------------------

export {
process,
onError,
onSuccess
}

+ 19
- 0
server/lib/schedulers/remove-old-jobs-scheduler.ts View File

@@ -0,0 +1,19 @@
import { JobQueue } from '../job-queue'
import { AbstractScheduler } from './abstract-scheduler'

export class RemoveOldJobsScheduler extends AbstractScheduler {

private static instance: AbstractScheduler

private constructor () {
super()
}

async execute () {
JobQueue.Instance.removeOldJobs()
}

static get Instance () {
return this.instance || (this.instance = new this())
}
}

+ 23
- 0
server/middlewares/validators/jobs.ts View File

@@ -0,0 +1,23 @@
import * as express from 'express'
import { param } from 'express-validator/check'
import { isValidJobState } from '../../helpers/custom-validators/jobs'
import { logger } from '../../helpers/logger'
import { areValidationErrors } from './utils'

const listJobsValidator = [
param('state').custom(isValidJobState).not().isEmpty().withMessage('Should have a valid job state'),

async (req: express.Request, res: express.Response, next: express.NextFunction) => {
logger.debug('Checking listJobsValidator parameters.', { parameters: req.params })

if (areValidationErrors(req, res)) return

return next()
}
]

// ---------------------------------------------------------------------------

export {
listJobsValidator
}

+ 0
- 80
server/models/job/job.ts View File

@@ -1,80 +0,0 @@
import { values } from 'lodash'
import { AllowNull, Column, CreatedAt, DataType, Model, Table, UpdatedAt } from 'sequelize-typescript'
import { JobCategory, JobState } from '../../../shared/models'
import { JOB_CATEGORIES, JOB_STATES } from '../../initializers'
import { getSort } from '../utils'

@Table({
tableName: 'job',
indexes: [
{
fields: [ 'state', 'category' ]
}
]
})
export class JobModel extends Model<JobModel> {
@AllowNull(false)
@Column(DataType.ENUM(values(JOB_STATES)))
state: JobState

@AllowNull(false)
@Column(DataType.ENUM(values(JOB_CATEGORIES)))
category: JobCategory

@AllowNull(false)
@Column
handlerName: string

@AllowNull(true)
@Column(DataType.JSON)
handlerInputData: any

@CreatedAt
createdAt: Date

@UpdatedAt
updatedAt: Date

static listWithLimitByCategory (limit: number, state: JobState, jobCategory: JobCategory) {
const query = {
order: [
[ 'id', 'ASC' ]
],
limit: limit,
where: {
state,
category: jobCategory
},
logging: false
}

return JobModel.findAll(query)
}

static listForApi (start: number, count: number, sort: string) {
const query = {
offset: start,
limit: count,
order: [ getSort(sort) ]
}

return JobModel.findAndCountAll(query).then(({ rows, count }) => {
return {
data: rows,
total: count
}
})
}

toFormattedJSON () {
return {
id: this.id,
state: this.state,
category: this.category,
handlerName: this.handlerName,
handlerInputData: this.handlerInputData,
createdAt: this.createdAt,
updatedAt: this.updatedAt
}
}
}

+ 10
- 1
server/tests/api/check-params/jobs.ts View File

@@ -7,7 +7,7 @@ import { checkBadCountPagination, checkBadSortPagination, checkBadStartPaginatio
import { makeGetRequest } from '../../utils/requests/requests'

describe('Test jobs API validators', function () {
const path = '/api/v1/jobs/'
const path = '/api/v1/jobs/failed'
let server: ServerInfo
let userAccessToken = ''

@@ -31,6 +31,15 @@ describe('Test jobs API validators', function () {
})

describe('When listing jobs', function () {

it('Should fail with a bad state', async function () {
await makeGetRequest({
url: server.url,
token: server.accessToken,
path: path + 'ade'
})
})

it('Should fail with a bad start pagination', async function () {
await checkBadStartPagination(server.url, path, server.accessToken)
})


+ 5
- 5
server/tests/api/server/handle-down.ts View File

@@ -2,6 +2,7 @@

import * as chai from 'chai'
import 'mocha'
import { JobState } from '../../../../shared/models'
import { VideoPrivacy } from '../../../../shared/models/videos'
import { VideoCommentThreadTree } from '../../../../shared/models/videos/video-comment.model'
import { completeVideoCheck, getVideo, immutableAssign, reRunServer, viewVideo } from &