@@ -28,6 +28,7 @@ export class JobsComponent extends RestTable implements OnInit { | |||
'activitypub-http-fetcher', | |||
'activitypub-http-unicast', | |||
'activitypub-refresher', | |||
'activitypub-cleaner', | |||
'actor-keys', | |||
'email', | |||
'video-file-import', | |||
@@ -192,6 +192,12 @@ federation: | |||
videos: | |||
federate_unlisted: false | |||
# Add a weekly job that cleans up remote AP interactions on local videos (shares, rates and comments) | |||
# It removes objects that do not exist anymore, and potentially fix their URLs | |||
# This setting is opt-in because due to an old bug in PeerTube, remote rates sent by instance before PeerTube 3.0 will be deleted | |||
# We still suggest you to enable this setting even if your users will loose most of their video's likes/dislikes | |||
cleanup_remote_interactions: false | |||
cache: | |||
previews: | |||
size: 500 # Max number of previews you want to cache | |||
@@ -190,6 +190,12 @@ federation: | |||
videos: | |||
federate_unlisted: false | |||
# Add a weekly job that cleans up remote AP interactions on local videos (shares, rates and comments) | |||
# It removes objects that do not exist anymore, and potentially fix their URLs | |||
# This setting is opt-in because due to an old bug in PeerTube, remote rates sent by instance before PeerTube 3.0 will be deleted | |||
# We still suggest you to enable this setting even if your users will loose most of their video's likes/dislikes | |||
cleanup_remote_interactions: false | |||
############################################################################### | |||
# | |||
@@ -1,15 +1,16 @@ | |||
import validator from 'validator' | |||
import { Activity, ActivityType } from '../../../../shared/models/activitypub' | |||
import { exists } from '../misc' | |||
import { sanitizeAndCheckActorObject } from './actor' | |||
import { isCacheFileObjectValid } from './cache-file' | |||
import { isFlagActivityValid } from './flag' | |||
import { isActivityPubUrlValid, isBaseActivityValid, isObjectValid } from './misc' | |||
import { isDislikeActivityValid } from './rate' | |||
import { isPlaylistObjectValid } from './playlist' | |||
import { isDislikeActivityValid, isLikeActivityValid } from './rate' | |||
import { isShareActivityValid } from './share' | |||
import { sanitizeAndCheckVideoCommentObject } from './video-comments' | |||
import { sanitizeAndCheckVideoTorrentObject } from './videos' | |||
import { isViewActivityValid } from './view' | |||
import { exists } from '../misc' | |||
import { isCacheFileObjectValid } from './cache-file' | |||
import { isFlagActivityValid } from './flag' | |||
import { isPlaylistObjectValid } from './playlist' | |||
function isRootActivityValid (activity: any) { | |||
return isCollection(activity) || isActivity(activity) | |||
@@ -70,8 +71,11 @@ function checkFlagActivity (activity: any) { | |||
} | |||
function checkDislikeActivity (activity: any) { | |||
return isBaseActivityValid(activity, 'Dislike') && | |||
isDislikeActivityValid(activity) | |||
return isDislikeActivityValid(activity) | |||
} | |||
function checkLikeActivity (activity: any) { | |||
return isLikeActivityValid(activity) | |||
} | |||
function checkCreateActivity (activity: any) { | |||
@@ -118,8 +122,7 @@ function checkRejectActivity (activity: any) { | |||
} | |||
function checkAnnounceActivity (activity: any) { | |||
return isBaseActivityValid(activity, 'Announce') && | |||
isObjectValid(activity.object) | |||
return isShareActivityValid(activity) | |||
} | |||
function checkUndoActivity (activity: any) { | |||
@@ -132,8 +135,3 @@ function checkUndoActivity (activity: any) { | |||
checkCreateActivity(activity.object) | |||
) | |||
} | |||
function checkLikeActivity (activity: any) { | |||
return isBaseActivityValid(activity, 'Like') && | |||
isObjectValid(activity.object) | |||
} |
@@ -1,13 +1,18 @@ | |||
import { isActivityPubUrlValid, isObjectValid } from './misc' | |||
import { isBaseActivityValid, isObjectValid } from './misc' | |||
function isLikeActivityValid (activity: any) { | |||
return isBaseActivityValid(activity, 'Like') && | |||
isObjectValid(activity.object) | |||
} | |||
function isDislikeActivityValid (activity: any) { | |||
return activity.type === 'Dislike' && | |||
isActivityPubUrlValid(activity.actor) && | |||
return isBaseActivityValid(activity, 'Dislike') && | |||
isObjectValid(activity.object) | |||
} | |||
// --------------------------------------------------------------------------- | |||
export { | |||
isDislikeActivityValid | |||
isDislikeActivityValid, | |||
isLikeActivityValid | |||
} |
@@ -0,0 +1,11 @@ | |||
import { isBaseActivityValid, isObjectValid } from './misc' | |||
function isShareActivityValid (activity: any) { | |||
return isBaseActivityValid(activity, 'Announce') && | |||
isObjectValid(activity.object) | |||
} | |||
// --------------------------------------------------------------------------- | |||
export { | |||
isShareActivityValid | |||
} |
@@ -36,7 +36,7 @@ function checkMissedConfig () { | |||
'rates_limit.login.window', 'rates_limit.login.max', 'rates_limit.ask_send_email.window', 'rates_limit.ask_send_email.max', | |||
'theme.default', | |||
'remote_redundancy.videos.accept_from', | |||
'federation.videos.federate_unlisted', | |||
'federation.videos.federate_unlisted', 'federation.videos.cleanup_remote_interactions', | |||
'search.remote_uri.users', 'search.remote_uri.anonymous', 'search.search_index.enabled', 'search.search_index.url', | |||
'search.search_index.disable_local_search', 'search.search_index.is_default_search', | |||
'live.enabled', 'live.allow_replay', 'live.max_duration', 'live.max_user_lives', 'live.max_instance_lives', | |||
@@ -159,7 +159,8 @@ const CONFIG = { | |||
}, | |||
FEDERATION: { | |||
VIDEOS: { | |||
FEDERATE_UNLISTED: config.get<boolean>('federation.videos.federate_unlisted') | |||
FEDERATE_UNLISTED: config.get<boolean>('federation.videos.federate_unlisted'), | |||
CLEANUP_REMOTE_INTERACTIONS: config.get<boolean>('federation.videos.cleanup_remote_interactions') | |||
} | |||
}, | |||
ADMIN: { | |||
@@ -137,6 +137,7 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = { | |||
'activitypub-http-unicast': 5, | |||
'activitypub-http-fetcher': 5, | |||
'activitypub-follow': 5, | |||
'activitypub-cleaner': 1, | |||
'video-file-import': 1, | |||
'video-transcoding': 1, | |||
'video-import': 1, | |||
@@ -147,10 +148,12 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = { | |||
'video-redundancy': 1, | |||
'video-live-ending': 1 | |||
} | |||
const JOB_CONCURRENCY: { [id in JobType]?: number } = { | |||
// Excluded keys are jobs that can be configured by admins | |||
const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = { | |||
'activitypub-http-broadcast': 1, | |||
'activitypub-http-unicast': 5, | |||
'activitypub-http-fetcher': 1, | |||
'activitypub-cleaner': 1, | |||
'activitypub-follow': 1, | |||
'video-file-import': 1, | |||
'email': 5, | |||
@@ -165,6 +168,7 @@ const JOB_TTL: { [id in JobType]: number } = { | |||
'activitypub-http-unicast': 60000 * 10, // 10 minutes | |||
'activitypub-http-fetcher': 1000 * 3600 * 10, // 10 hours | |||
'activitypub-follow': 60000 * 10, // 10 minutes | |||
'activitypub-cleaner': 1000 * 3600, // 1 hour | |||
'video-file-import': 1000 * 3600, // 1 hour | |||
'video-transcoding': 1000 * 3600 * 48, // 2 days, transcoding could be long | |||
'video-import': 1000 * 3600 * 2, // 2 hours | |||
@@ -178,6 +182,9 @@ const JOB_TTL: { [id in JobType]: number } = { | |||
const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = { | |||
'videos-views': { | |||
cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour | |||
}, | |||
'activitypub-cleaner': { | |||
cron: '30 5 * * ' + randomInt(0, 7) // 1 time per week (random day) at 5:30 AM | |||
} | |||
} | |||
const JOB_PRIORITY = { | |||
@@ -188,6 +195,7 @@ const JOB_PRIORITY = { | |||
} | |||
const BROADCAST_CONCURRENCY = 10 // How many requests in parallel we do in activitypub-http-broadcast job | |||
const AP_CLEANER_CONCURRENCY = 10 // How many requests in parallel we do in activitypub-cleaner job | |||
const CRAWL_REQUEST_CONCURRENCY = 1 // How many requests in parallel to fetch remote data (likes, shares...) | |||
const JOB_REQUEST_TIMEOUT = 7000 // 7 seconds | |||
const JOB_COMPLETED_LIFETIME = 60000 * 60 * 24 * 2 // 2 days | |||
@@ -756,6 +764,7 @@ if (isTestInstance() === true) { | |||
SCHEDULER_INTERVALS_MS.autoFollowIndexInstances = 5000 | |||
SCHEDULER_INTERVALS_MS.updateInboxStats = 5000 | |||
REPEAT_JOBS['videos-views'] = { every: 5000 } | |||
REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 } | |||
REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1 | |||
@@ -815,6 +824,7 @@ export { | |||
REDUNDANCY, | |||
JOB_CONCURRENCY, | |||
JOB_ATTEMPTS, | |||
AP_CLEANER_CONCURRENCY, | |||
LAST_MIGRATION_VERSION, | |||
OAUTH_LIFETIME, | |||
CUSTOM_HTML_TAG_COMMENTS, | |||
@@ -41,10 +41,10 @@ async function resolveThread (params: ResolveThreadParams): ResolveThreadResult | |||
return await tryResolveThreadFromVideo(params) | |||
} | |||
} catch (err) { | |||
logger.debug('Cannot get or create account and video and channel for reply %s, fetch comment', url, { err }) | |||
logger.debug('Cannot resolve thread from video %s, maybe because it was not a video', url, { err }) | |||
} | |||
return resolveParentComment(params) | |||
return resolveRemoteParentComment(params) | |||
} | |||
export { | |||
@@ -119,7 +119,7 @@ async function tryResolveThreadFromVideo (params: ResolveThreadParams) { | |||
return { video, comment: resultComment, commentCreated } | |||
} | |||
async function resolveParentComment (params: ResolveThreadParams) { | |||
async function resolveRemoteParentComment (params: ResolveThreadParams) { | |||
const { url, comments } = params | |||
if (comments.length > ACTIVITY_PUB.MAX_RECURSION_COMMENTS) { | |||
@@ -133,7 +133,7 @@ async function resolveParentComment (params: ResolveThreadParams) { | |||
}) | |||
if (sanitizeAndCheckVideoCommentObject(body) === false) { | |||
throw new Error('Remote video comment JSON is not valid:' + JSON.stringify(body)) | |||
throw new Error(`Remote video comment JSON ${url} is not valid:` + JSON.stringify(body)) | |||
} | |||
const actorUrl = body.attributedTo | |||
@@ -0,0 +1,194 @@ | |||
import * as Bluebird from 'bluebird' | |||
import * as Bull from 'bull' | |||
import { checkUrlsSameHost } from '@server/helpers/activitypub' | |||
import { isDislikeActivityValid, isLikeActivityValid } from '@server/helpers/custom-validators/activitypub/rate' | |||
import { isShareActivityValid } from '@server/helpers/custom-validators/activitypub/share' | |||
import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments' | |||
import { doRequest } from '@server/helpers/requests' | |||
import { AP_CLEANER_CONCURRENCY } from '@server/initializers/constants' | |||
import { VideoModel } from '@server/models/video/video' | |||
import { VideoCommentModel } from '@server/models/video/video-comment' | |||
import { VideoShareModel } from '@server/models/video/video-share' | |||
import { HttpStatusCode } from '@shared/core-utils' | |||
import { logger } from '../../../helpers/logger' | |||
import { AccountVideoRateModel } from '../../../models/account/account-video-rate' | |||
// Job to clean remote interactions off local videos | |||
async function processActivityPubCleaner (_job: Bull.Job) { | |||
logger.info('Processing ActivityPub cleaner.') | |||
{ | |||
const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos() | |||
const { bodyValidator, deleter, updater } = rateOptionsFactory() | |||
await Bluebird.map(rateUrls, async rateUrl => { | |||
try { | |||
const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter) | |||
if (result?.status === 'deleted') { | |||
const { videoId, type } = result.data | |||
await VideoModel.updateRatesOf(videoId, type, undefined) | |||
} | |||
} catch (err) { | |||
logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err }) | |||
} | |||
}, { concurrency: AP_CLEANER_CONCURRENCY }) | |||
} | |||
{ | |||
const shareUrls = await VideoShareModel.listRemoteShareUrlsOfLocalVideos() | |||
const { bodyValidator, deleter, updater } = shareOptionsFactory() | |||
await Bluebird.map(shareUrls, async shareUrl => { | |||
try { | |||
await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter) | |||
} catch (err) { | |||
logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err }) | |||
} | |||
}, { concurrency: AP_CLEANER_CONCURRENCY }) | |||
} | |||
{ | |||
const commentUrls = await VideoCommentModel.listRemoteCommentUrlsOfLocalVideos() | |||
const { bodyValidator, deleter, updater } = commentOptionsFactory() | |||
await Bluebird.map(commentUrls, async commentUrl => { | |||
try { | |||
await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter) | |||
} catch (err) { | |||
logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err }) | |||
} | |||
}, { concurrency: AP_CLEANER_CONCURRENCY }) | |||
} | |||
} | |||
// --------------------------------------------------------------------------- | |||
export { | |||
processActivityPubCleaner | |||
} | |||
// --------------------------------------------------------------------------- | |||
async function updateObjectIfNeeded <T> ( | |||
url: string, | |||
bodyValidator: (body: any) => boolean, | |||
updater: (url: string, newUrl: string) => Promise<T>, | |||
deleter: (url: string) => Promise<T> | |||
): Promise<{ data: T, status: 'deleted' | 'updated' } | null> { | |||
// Fetch url | |||
const { response, body } = await doRequest<any>({ | |||
uri: url, | |||
json: true, | |||
activityPub: true | |||
}) | |||
// Does not exist anymore, remove entry | |||
if (response.statusCode === HttpStatusCode.NOT_FOUND_404) { | |||
logger.info('Removing remote AP object %s.', url) | |||
const data = await deleter(url) | |||
return { status: 'deleted', data } | |||
} | |||
// If not same id, check same host and update | |||
if (!body || !body.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`) | |||
if (body.type === 'Tombstone') { | |||
logger.info('Removing remote AP object %s.', url) | |||
const data = await deleter(url) | |||
return { status: 'deleted', data } | |||
} | |||
const newUrl = body.id | |||
if (newUrl !== url) { | |||
if (checkUrlsSameHost(newUrl, url) !== true) { | |||
throw new Error(`New url ${newUrl} has not the same host than old url ${url}`) | |||
} | |||
logger.info('Updating remote AP object %s.', url) | |||
const data = await updater(url, newUrl) | |||
return { status: 'updated', data } | |||
} | |||
return null | |||
} | |||
function rateOptionsFactory () { | |||
return { | |||
bodyValidator: (body: any) => isLikeActivityValid(body) || isDislikeActivityValid(body), | |||
updater: async (url: string, newUrl: string) => { | |||
const rate = await AccountVideoRateModel.loadByUrl(url, undefined) | |||
rate.url = newUrl | |||
const videoId = rate.videoId | |||
const type = rate.type | |||
await rate.save() | |||
return { videoId, type } | |||
}, | |||
deleter: async (url) => { | |||
const rate = await AccountVideoRateModel.loadByUrl(url, undefined) | |||
const videoId = rate.videoId | |||
const type = rate.type | |||
await rate.destroy() | |||
return { videoId, type } | |||
} | |||
} | |||
} | |||
function shareOptionsFactory () { | |||
return { | |||
bodyValidator: (body: any) => isShareActivityValid(body), | |||
updater: async (url: string, newUrl: string) => { | |||
const share = await VideoShareModel.loadByUrl(url, undefined) | |||
share.url = newUrl | |||
await share.save() | |||
return undefined | |||
}, | |||
deleter: async (url) => { | |||
const share = await VideoShareModel.loadByUrl(url, undefined) | |||
await share.destroy() | |||
return undefined | |||
} | |||
} | |||
} | |||
function commentOptionsFactory () { | |||
return { | |||
bodyValidator: (body: any) => sanitizeAndCheckVideoCommentObject(body), | |||
updater: async (url: string, newUrl: string) => { | |||
const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url) | |||
comment.url = newUrl | |||
await comment.save() | |||
return undefined | |||
}, | |||
deleter: async (url) => { | |||
const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url) | |||
await comment.destroy() | |||
return undefined | |||
} | |||
} | |||
} |
@@ -6,7 +6,7 @@ import { logger } from '../../../helpers/logger' | |||
async function processActorKeys (job: Bull.Job) { | |||
const payload = job.data as ActorKeysPayload | |||
logger.info('Processing email in job %d.', job.id) | |||
logger.info('Processing actor keys in job %d.', job.id) | |||
const actor = await ActorModel.load(payload.actorId) | |||
@@ -21,6 +21,7 @@ import { | |||
import { logger } from '../../helpers/logger' | |||
import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' | |||
import { Redis } from '../redis' | |||
import { processActivityPubCleaner } from './handlers/activitypub-cleaner' | |||
import { processActivityPubFollow } from './handlers/activitypub-follow' | |||
import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' | |||
import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' | |||
@@ -38,6 +39,7 @@ type CreateJobArgument = | |||
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | |||
{ type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | | |||
{ type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | | |||
{ type: 'activitypub-http-cleaner', payload: {} } | | |||
{ type: 'activitypub-follow', payload: ActivitypubFollowPayload } | | |||
{ type: 'video-file-import', payload: VideoFileImportPayload } | | |||
{ type: 'video-transcoding', payload: VideoTranscodingPayload } | | |||
@@ -58,6 +60,7 @@ const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { | |||
'activitypub-http-broadcast': processActivityPubHttpBroadcast, | |||
'activitypub-http-unicast': processActivityPubHttpUnicast, | |||
'activitypub-http-fetcher': processActivityPubHttpFetcher, | |||
'activitypub-cleaner': processActivityPubCleaner, | |||
'activitypub-follow': processActivityPubFollow, | |||
'video-file-import': processVideoFileImport, | |||
'video-transcoding': processVideoTranscoding, | |||
@@ -75,6 +78,7 @@ const jobTypes: JobType[] = [ | |||
'activitypub-http-broadcast', | |||
'activitypub-http-fetcher', | |||
'activitypub-http-unicast', | |||
'activitypub-cleaner', | |||
'email', | |||
'video-transcoding', | |||
'video-file-import', | |||
@@ -233,6 +237,12 @@ class JobQueue { | |||
this.queues['videos-views'].add({}, { | |||
repeat: REPEAT_JOBS['videos-views'] | |||
}).catch(err => logger.error('Cannot add repeatable job.', { err })) | |||
if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { | |||
this.queues['activitypub-cleaner'].add({}, { | |||
repeat: REPEAT_JOBS['activitypub-cleaner'] | |||
}).catch(err => logger.error('Cannot add repeatable job.', { err })) | |||
} | |||
} | |||
private filterJobTypes (jobType?: JobType) { | |||
@@ -1,6 +1,6 @@ | |||
import * as express from 'express' | |||
import { body, param, query } from 'express-validator' | |||
import { isIdOrUUIDValid } from '../../../helpers/custom-validators/misc' | |||
import { isIdOrUUIDValid, isIdValid } from '../../../helpers/custom-validators/misc' | |||
import { isRatingValid } from '../../../helpers/custom-validators/video-rates' | |||
import { isVideoRatingTypeValid } from '../../../helpers/custom-validators/videos' | |||
import { logger } from '../../../helpers/logger' | |||
@@ -28,14 +28,14 @@ const videoUpdateRateValidator = [ | |||
const getAccountVideoRateValidatorFactory = function (rateType: VideoRateType) { | |||
return [ | |||
param('name').custom(isAccountNameValid).withMessage('Should have a valid account name'), | |||
param('videoId').custom(isIdOrUUIDValid).not().isEmpty().withMessage('Should have a valid videoId'), | |||
param('videoId').custom(isIdValid).not().isEmpty().withMessage('Should have a valid videoId'), | |||
async (req: express.Request, res: express.Response, next: express.NextFunction) => { | |||
logger.debug('Checking videoCommentGetValidator parameters.', { parameters: req.params }) | |||
if (areValidationErrors(req, res)) return | |||
const rate = await AccountVideoRateModel.loadLocalAndPopulateVideo(rateType, req.params.name, req.params.videoId) | |||
const rate = await AccountVideoRateModel.loadLocalAndPopulateVideo(rateType, req.params.name, +req.params.videoId) | |||
if (!rate) { | |||
return res.status(HttpStatusCode.NOT_FOUND_404) | |||
.json({ error: 'Video rate not found' }) | |||
@@ -146,10 +146,22 @@ export class AccountVideoRateModel extends Model { | |||
return AccountVideoRateModel.findAndCountAll(query) | |||
} | |||
static listRemoteRateUrlsOfLocalVideos () { | |||
const query = `SELECT "accountVideoRate".url FROM "accountVideoRate" ` + | |||
`INNER JOIN account ON account.id = "accountVideoRate"."accountId" ` + | |||
`INNER JOIN actor ON actor.id = account."actorId" AND actor."serverId" IS NOT NULL ` + | |||
`INNER JOIN video ON video.id = "accountVideoRate"."videoId" AND video.remote IS FALSE` | |||
return AccountVideoRateModel.sequelize.query<{ url: string }>(query, { | |||
type: QueryTypes.SELECT, | |||
raw: true | |||
}).then(rows => rows.map(r => r.url)) | |||
} | |||
static loadLocalAndPopulateVideo ( | |||
rateType: VideoRateType, | |||
accountName: string, | |||
videoId: number | string, | |||
videoId: number, | |||
t?: Transaction | |||
): Promise<MAccountVideoRateAccountVideo> { | |||
const options: FindOptions = { | |||
@@ -241,21 +253,7 @@ export class AccountVideoRateModel extends Model { | |||
await AccountVideoRateModel.destroy(query) | |||
const field = type === 'like' | |||
? 'likes' | |||
: 'dislikes' | |||
const rawQuery = `UPDATE "video" SET "${field}" = ` + | |||
'(' + | |||
'SELECT COUNT(id) FROM "accountVideoRate" WHERE "accountVideoRate"."videoId" = "video"."id" AND type = :rateType' + | |||
') ' + | |||
'WHERE "video"."id" = :videoId' | |||
return AccountVideoRateModel.sequelize.query(rawQuery, { | |||
transaction: t, | |||
replacements: { videoId, rateType: type }, | |||
type: QueryTypes.UPDATE | |||
}) | |||
return VideoModel.updateRatesOf(videoId, type, t) | |||
}) | |||
} | |||
@@ -1,5 +1,5 @@ | |||
import { uniq } from 'lodash' | |||
import { FindAndCountOptions, FindOptions, Op, Order, ScopeOptions, Sequelize, Transaction, WhereOptions } from 'sequelize' | |||
import { FindAndCountOptions, FindOptions, Op, Order, QueryTypes, ScopeOptions, Sequelize, Transaction, WhereOptions } from 'sequelize' | |||
import { | |||
AllowNull, | |||
BelongsTo, | |||
@@ -696,6 +696,18 @@ export class VideoCommentModel extends Model { | |||
} | |||
} | |||
static listRemoteCommentUrlsOfLocalVideos () { | |||
const query = `SELECT "videoComment".url FROM "videoComment" ` + | |||
`INNER JOIN account ON account.id = "videoComment"."accountId" ` + | |||
`INNER JOIN actor ON actor.id = "account"."actorId" AND actor."serverId" IS NOT NULL ` + | |||
`INNER JOIN video ON video.id = "videoComment"."videoId" AND video.remote IS FALSE` | |||
return VideoCommentModel.sequelize.query<{ url: string }>(query, { | |||
type: QueryTypes.SELECT, | |||
raw: true | |||
}).then(rows => rows.map(r => r.url)) | |||
} | |||
static cleanOldCommentsOf (videoId: number, beforeUpdatedAt: Date) { | |||
const query = { | |||
where: { | |||
@@ -1,4 +1,4 @@ | |||
import { literal, Op, Transaction } from 'sequelize' | |||
import { literal, Op, QueryTypes, Transaction } from 'sequelize' | |||
import { AllowNull, BelongsTo, Column, CreatedAt, DataType, ForeignKey, Is, Model, Scopes, Table, UpdatedAt } from 'sequelize-typescript' | |||
import { isActivityPubUrlValid } from '../../helpers/custom-validators/activitypub/misc' | |||
import { CONSTRAINTS_FIELDS } from '../../initializers/constants' | |||
@@ -185,6 +185,17 @@ export class VideoShareModel extends Model { | |||
return VideoShareModel.findAndCountAll(query) | |||
} | |||
static listRemoteShareUrlsOfLocalVideos () { | |||
const query = `SELECT "videoShare".url FROM "videoShare" ` + | |||
`INNER JOIN actor ON actor.id = "videoShare"."actorId" AND actor."serverId" IS NOT NULL ` + | |||
`INNER JOIN video ON video.id = "videoShare"."videoId" AND video.remote IS FALSE` | |||
return VideoShareModel.sequelize.query<{ url: string }>(query, { | |||
type: QueryTypes.SELECT, | |||
raw: true | |||
}).then(rows => rows.map(r => r.url)) | |||
} | |||
static cleanOldSharesOf (videoId: number, beforeUpdatedAt: Date) { | |||
const query = { | |||
where: { | |||
@@ -34,7 +34,7 @@ import { ModelCache } from '@server/models/model-cache' | |||
import { VideoFile } from '@shared/models/videos/video-file.model' | |||
import { ResultList, UserRight, VideoPrivacy, VideoState } from '../../../shared' | |||
import { VideoObject } from '../../../shared/models/activitypub/objects' | |||
import { Video, VideoDetails } from '../../../shared/models/videos' | |||
import { Video, VideoDetails, VideoRateType } from '../../../shared/models/videos' | |||
import { ThumbnailType } from '../../../shared/models/videos/thumbnail.type' | |||
import { VideoFilter } from '../../../shared/models/videos/video-query.type' | |||
import { VideoStreamingPlaylistType } from '../../../shared/models/videos/video-streaming-playlist.type' | |||
@@ -1509,6 +1509,24 @@ export class VideoModel extends Model { | |||
}) | |||
} | |||
static updateRatesOf (videoId: number, type: VideoRateType, t: Transaction) { | |||
const field = type === 'like' | |||
? 'likes' | |||
: 'dislikes' | |||
const rawQuery = `UPDATE "video" SET "${field}" = ` + | |||
'(' + | |||
'SELECT COUNT(id) FROM "accountVideoRate" WHERE "accountVideoRate"."videoId" = "video"."id" AND type = :rateType' + | |||
') ' + | |||
'WHERE "video"."id" = :videoId' | |||
return AccountVideoRateModel.sequelize.query(rawQuery, { | |||
transaction: t, | |||
replacements: { videoId, rateType: type }, | |||
type: QueryTypes.UPDATE | |||
}) | |||
} | |||
static checkVideoHasInstanceFollow (videoId: number, followerActorId: number) { | |||
// Instances only share videos | |||
const query = 'SELECT 1 FROM "videoShare" ' + | |||
@@ -0,0 +1,283 @@ | |||
/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ | |||
import 'mocha' | |||
import * as chai from 'chai' | |||
import { | |||
cleanupTests, | |||
closeAllSequelize, | |||
deleteAll, | |||
doubleFollow, | |||
getCount, | |||
selectQuery, | |||
setVideoField, | |||
updateQuery, | |||
wait | |||
} from '../../../../shared/extra-utils' | |||
import { flushAndRunMultipleServers, ServerInfo, setAccessTokensToServers } from '../../../../shared/extra-utils/index' | |||
import { waitJobs } from '../../../../shared/extra-utils/server/jobs' | |||
import { addVideoCommentThread, getVideoCommentThreads } from '../../../../shared/extra-utils/videos/video-comments' | |||
import { getVideo, rateVideo, uploadVideoAndGetId } from '../../../../shared/extra-utils/videos/videos' | |||
const expect = chai.expect | |||
describe('Test AP cleaner', function () { | |||
let servers: ServerInfo[] = [] | |||
let videoUUID1: string | |||
let videoUUID2: string | |||
let videoUUID3: string | |||
let videoUUIDs: string[] | |||
before(async function () { | |||
this.timeout(120000) | |||
const config = { | |||
federation: { | |||
videos: { cleanup_remote_interactions: true } | |||
} | |||
} | |||
servers = await flushAndRunMultipleServers(3, config) | |||
// Get the access tokens | |||
await setAccessTokensToServers(servers) | |||
await Promise.all([ | |||
doubleFollow(servers[0], servers[1]), | |||
doubleFollow(servers[1], servers[2]), | |||
doubleFollow(servers[0], servers[2]) | |||
]) | |||
// Update 1 local share, check 6 shares | |||
// Create 1 comment per video | |||
// Update 1 remote URL and 1 local URL on | |||
videoUUID1 = (await uploadVideoAndGetId({ server: servers[0], videoName: 'server 1' })).uuid | |||
videoUUID2 = (await uploadVideoAndGetId({ server: servers[1], videoName: 'server 2' })).uuid | |||
videoUUID3 = (await uploadVideoAndGetId({ server: servers[2], videoName: 'server 3' })).uuid | |||
videoUUIDs = [ videoUUID1, videoUUID2, videoUUID3 ] | |||
await waitJobs(servers) | |||
for (const server of servers) { | |||
for (const uuid of videoUUIDs) { | |||
await rateVideo(server.url, server.accessToken, uuid, 'like') | |||
await addVideoCommentThread(server.url, server.accessToken, uuid, 'comment') | |||
} | |||
} | |||
await waitJobs(servers) | |||
}) | |||
it('Should have the correct likes', async function () { | |||
for (const server of servers) { | |||
for (const uuid of videoUUIDs) { | |||
const res = await getVideo(server.url, uuid) | |||
expect(res.body.likes).to.equal(3) | |||
expect(res.body.dislikes).to.equal(0) | |||
} | |||
} | |||
}) | |||
it('Should destroy server 3 internal likes and correctly clean them', async function () { | |||
this.timeout(20000) | |||
await deleteAll(servers[2].internalServerNumber, 'accountVideoRate') | |||
for (const uuid of videoUUIDs) { | |||
await setVideoField(servers[2].internalServerNumber, uuid, 'likes', '0') | |||
} | |||
await wait(5000) | |||
await waitJobs(servers) | |||
// Updated rates of my video | |||
{ | |||
const res = await getVideo(servers[0].url, videoUUID1) | |||
expect(res.body.likes).to.equal(2) | |||
expect(res.body.dislikes).to.equal(0) | |||
} | |||
// Did not update rates of a remote video | |||
{ | |||
const res = await getVideo(servers[0].url, videoUUID2) | |||
expect(res.body.likes).to.equal(3) | |||
expect(res.body.dislikes).to.equal(0) | |||
} | |||
}) | |||
it('Should update rates to dislikes', async function () { | |||
this.timeout(20000) | |||
for (const server of servers) { | |||
for (const uuid of videoUUIDs) { | |||
await rateVideo(server.url, server.accessToken, uuid, 'dislike') | |||
} | |||
} | |||
await waitJobs(servers) | |||
for (const server of servers) { | |||
for (const uuid of videoUUIDs) { | |||
const res = await getVideo(server.url, uuid) | |||
expect(res.body.likes).to.equal(0) | |||
expect(res.body.dislikes).to.equal(3) | |||
} | |||
} | |||
}) | |||
it('Should destroy server 3 internal dislikes and correctly clean them', async function () { | |||
this.timeout(20000) | |||
await deleteAll(servers[2].internalServerNumber, 'accountVideoRate') | |||
for (const uuid of videoUUIDs) { | |||
await setVideoField(servers[2].internalServerNumber, uuid, 'dislikes', '0') | |||
} | |||
await wait(5000) | |||
await waitJobs(servers) | |||
// Updated rates of my video | |||
{ | |||
const res = await getVideo(servers[0].url, videoUUID1) | |||
expect(res.body.likes).to.equal(0) | |||
expect(res.body.dislikes).to.equal(2) | |||
} | |||
// Did not update rates of a remote video | |||
{ | |||
const res = await getVideo(servers[0].url, videoUUID2) | |||
expect(res.body.likes).to.equal(0) | |||
expect(res.body.dislikes).to.equal(3) | |||
} | |||
}) | |||
it('Should destroy server 3 internal shares and correctly clean them', async function () { | |||
this.timeout(20000) | |||
const preCount = await getCount(servers[0].internalServerNumber, 'videoShare') | |||
expect(preCount).to.equal(6) | |||
await deleteAll(servers[2].internalServerNumber, 'videoShare') | |||
await wait(5000) | |||
await waitJobs(servers) | |||
// Still 6 because we don't have remote shares on local videos | |||
const postCount = await getCount(servers[0].internalServerNumber, 'videoShare') | |||
expect(postCount).to.equal(6) | |||
}) | |||
it('Should destroy server 3 internal comments and correctly clean them', async function () { | |||
this.timeout(20000) | |||
{ | |||
const res = await getVideoCommentThreads(servers[0].url, videoUUID1, 0, 5) | |||
expect(res.body.total).to.equal(3) | |||
} | |||
await deleteAll(servers[2].internalServerNumber, 'videoComment') | |||
await wait(5000) | |||
await waitJobs(servers) | |||
{ | |||
const res = await getVideoCommentThreads(servers[0].url, videoUUID1, 0, 5) | |||
expect(res.body.total).to.equal(2) | |||
} | |||
}) | |||
it('Should correctly update rate URLs', async function () { | |||
this.timeout(30000) | |||
async function check (like: string, ofServerUrl: string, urlSuffix: string, remote: 'true' | 'false') { | |||
const query = `SELECT "videoId", "accountVideoRate".url FROM "accountVideoRate" ` + | |||
`INNER JOIN video ON "accountVideoRate"."videoId" = video.id AND remote IS ${remote} WHERE "accountVideoRate"."url" LIKE '${like}'` | |||
const res = await selectQuery(servers[0].internalServerNumber, query) | |||
for (const rate of res) { | |||
const matcher = new RegExp(`^${ofServerUrl}/accounts/root/dislikes/\\d+${urlSuffix}$`) | |||
expect(rate.url).to.match(matcher) | |||
} | |||
} | |||
async function checkLocal () { | |||
const startsWith = 'http://' + servers[0].host + '%' | |||
// On local videos | |||
await check(startsWith, servers[0].url, '', 'false') | |||
// On remote videos | |||
await check(startsWith, servers[0].url, '', 'true') | |||
} | |||
async function checkRemote (suffix: string) { | |||
const startsWith = 'http://' + servers[1].host + '%' | |||
// On local videos | |||
await check(startsWith, servers[1].url, suffix, 'false') | |||
// On remote videos, we should not update URLs so no suffix | |||
await check(startsWith, servers[1].url, '', 'true') | |||
} | |||
await checkLocal() | |||
await checkRemote('') | |||
{ | |||
const query = `UPDATE "accountVideoRate" SET url = url || 'stan'` | |||
await updateQuery(servers[1].internalServerNumber, query) | |||
await wait(5000) | |||
await waitJobs(servers) | |||
} | |||
await checkLocal() | |||
await checkRemote('stan') | |||
}) | |||
it('Should correctly update comment URLs', async function () { | |||
this.timeout(30000) | |||
async function check (like: string, ofServerUrl: string, urlSuffix: string, remote: 'true' | 'false') { | |||
const query = `SELECT "videoId", "videoComment".url, uuid as "videoUUID" FROM "videoComment" ` + | |||
`INNER JOIN video ON "videoComment"."videoId" = video.id AND remote IS ${remote} WHERE "videoComment"."url" LIKE '${like}'` | |||
const res = await selectQuery(servers[0].internalServerNumber, query) | |||
for (const comment of res) { | |||
const matcher = new RegExp(`${ofServerUrl}/videos/watch/${comment.videoUUID}/comments/\\d+${urlSuffix}`) | |||
expect(comment.url).to.match(matcher) | |||
} | |||
} | |||
async function checkLocal () { | |||
const startsWith = 'http://' + servers[0].host + '%' | |||
// On local videos | |||
await check(startsWith, servers[0].url, '', 'false') | |||
// On remote videos | |||
await check(startsWith, servers[0].url, '', 'true') | |||
} | |||
async function checkRemote (suffix: string) { | |||
const startsWith = 'http://' + servers[1].host + '%' | |||
// On local videos | |||
await check(startsWith, servers[1].url, suffix, 'false') | |||
// On remote videos, we should not update URLs so no suffix | |||
await check(startsWith, servers[1].url, '', 'true') | |||
} | |||
{ | |||
const query = `UPDATE "videoComment" SET url = url || 'kyle'` | |||
await updateQuery(servers[1].internalServerNumber, query) | |||
await wait(5000) | |||
await waitJobs(servers) | |||
} | |||
await checkLocal() | |||
await checkRemote('kyle') | |||
}) | |||
after(async function () { | |||
await cleanupTests(servers) | |||
await closeAllSequelize(servers) | |||
}) | |||
}) |
@@ -1,3 +1,4 @@ | |||
import './cleaner' | |||
import './client' | |||
import './fetch' | |||
import './refresher' | |||
@@ -24,6 +24,25 @@ function getSequelize (internalServerNumber: number) { | |||
return seq | |||
} | |||
function deleteAll (internalServerNumber: number, table: string) { | |||
const seq = getSequelize(internalServerNumber) | |||
const options = { type: QueryTypes.DELETE } | |||
return seq.query(`DELETE FROM "${table}"`, options) | |||
} | |||
async function getCount (internalServerNumber: number, table: string) { | |||
const seq = getSequelize(internalServerNumber) | |||
const options = { type: QueryTypes.SELECT as QueryTypes.SELECT } | |||
const [ { total } ] = await seq.query<{ total: string }>(`SELECT COUNT(*) as total FROM "${table}"`, options) | |||
if (total === null) return 0 | |||
return parseInt(total, 10) | |||
} | |||
function setActorField (internalServerNumber: number, to: string, field: string, value: string) { | |||
const seq = getSequelize(internalServerNumber) | |||
@@ -63,6 +82,20 @@ async function countVideoViewsOf (internalServerNumber: number, uuid: string) { | |||
return parseInt(total + '', 10) | |||
} | |||
function selectQuery (internalServerNumber: number, query: string) { | |||
const seq = getSequelize(internalServerNumber) | |||
const options = { type: QueryTypes.SELECT as QueryTypes.SELECT } | |||
return seq.query<any>(query, options) | |||
} | |||
function updateQuery (internalServerNumber: number, query: string) { | |||
const seq = getSequelize(internalServerNumber) | |||
const options = { type: QueryTypes.UPDATE as QueryTypes.UPDATE } | |||
return seq.query(query, options) | |||
} | |||
async function closeAllSequelize (servers: ServerInfo[]) { | |||
for (const server of servers) { | |||
if (sequelizes[server.internalServerNumber]) { | |||
@@ -95,6 +128,10 @@ export { | |||
setActorField, | |||
countVideoViewsOf, | |||
setPluginVersion, | |||
selectQuery, | |||
deleteAll, | |||
updateQuery, | |||
setActorFollowScores, | |||
closeAllSequelize | |||
closeAllSequelize, | |||
getCount | |||
} |
@@ -63,6 +63,7 @@ async function waitJobs (serversArg: ServerInfo[] | ServerInfo) { | |||
else servers = serversArg as ServerInfo[] | |||
const states: JobState[] = [ 'waiting', 'active', 'delayed' ] | |||
const repeatableJobs = [ 'videos-views', 'activitypub-cleaner' ] | |||
let pendingRequests: boolean | |||
function tasksBuilder () { | |||
@@ -79,7 +80,7 @@ async function waitJobs (serversArg: ServerInfo[] | ServerInfo) { | |||
count: 10, | |||
sort: '-createdAt' | |||
}).then(res => res.body.data) | |||
.then((jobs: Job[]) => jobs.filter(j => j.type !== 'videos-views')) | |||
.then((jobs: Job[]) => jobs.filter(j => !repeatableJobs.includes(j.type))) | |||
.then(jobs => { | |||
if (jobs.length !== 0) { | |||
pendingRequests = true | |||
@@ -498,7 +498,7 @@ function updateVideo ( | |||
}) | |||
} | |||
function rateVideo (url: string, accessToken: string, id: number, rating: string, specialStatus = HttpStatusCode.NO_CONTENT_204) { | |||
function rateVideo (url: string, accessToken: string, id: number | string, rating: string, specialStatus = HttpStatusCode.NO_CONTENT_204) { | |||
const path = '/api/v1/videos/' + id + '/rate' | |||
return request(url) | |||
@@ -8,6 +8,7 @@ export type JobType = | |||
| 'activitypub-http-unicast' | |||
| 'activitypub-http-broadcast' | |||
| 'activitypub-http-fetcher' | |||
| 'activitypub-cleaner' | |||
| 'activitypub-follow' | |||
| 'video-file-import' | |||
| 'video-transcoding' | |||