@@ -94,7 +94,7 @@ import { | |||
} from './server/controllers' | |||
import { advertiseDoNotTrack } from './server/middlewares/dnt' | |||
import { Redis } from './server/lib/redis' | |||
import { BadActorFollowScheduler } from './server/lib/schedulers/bad-actor-follow-scheduler' | |||
import { ActorFollowScheduler } from './server/lib/schedulers/actor-follow-scheduler' | |||
import { RemoveOldJobsScheduler } from './server/lib/schedulers/remove-old-jobs-scheduler' | |||
import { UpdateVideosScheduler } from './server/lib/schedulers/update-videos-scheduler' | |||
import { YoutubeDlUpdateScheduler } from './server/lib/schedulers/youtube-dl-update-scheduler' | |||
@@ -219,7 +219,7 @@ async function startApplication () { | |||
VideosCaptionCache.Instance.init(CONFIG.CACHE.VIDEO_CAPTIONS.SIZE, CACHE.VIDEO_CAPTIONS.MAX_AGE) | |||
// Enable Schedulers | |||
BadActorFollowScheduler.Instance.enable() | |||
ActorFollowScheduler.Instance.enable() | |||
RemoveOldJobsScheduler.Instance.enable() | |||
UpdateVideosScheduler.Instance.enable() | |||
YoutubeDlUpdateScheduler.Instance.enable() | |||
@@ -144,7 +144,7 @@ const VIDEO_IMPORT_TIMEOUT = 1000 * 3600 // 1 hour | |||
// 1 hour | |||
let SCHEDULER_INTERVALS_MS = { | |||
badActorFollow: 60000 * 60, // 1 hour | |||
actorFollowScores: 60000 * 60, // 1 hour | |||
removeOldJobs: 60000 * 60, // 1 hour | |||
updateVideos: 60000, // 1 minute | |||
youtubeDLUpdate: 60000 * 60 * 24 // 1 day | |||
@@ -675,7 +675,7 @@ if (isTestInstance() === true) { | |||
CONSTRAINTS_FIELDS.ACTORS.AVATAR.FILE_SIZE.max = 100 * 1024 // 100KB | |||
SCHEDULER_INTERVALS_MS.badActorFollow = 10000 | |||
SCHEDULER_INTERVALS_MS.actorFollowScores = 1000 | |||
SCHEDULER_INTERVALS_MS.removeOldJobs = 10000 | |||
SCHEDULER_INTERVALS_MS.updateVideos = 5000 | |||
REPEAT_JOBS['videos-views'] = { every: 5000 } | |||
@@ -0,0 +1,46 @@ | |||
import { ACTOR_FOLLOW_SCORE } from '../../initializers' | |||
import { logger } from '../../helpers/logger' | |||
// Cache follows scores, instead of writing them too often in database | |||
// Keep data in memory, we don't really need Redis here as we don't really care to loose some scores | |||
class ActorFollowScoreCache { | |||
private static instance: ActorFollowScoreCache | |||
private pendingFollowsScore: { [ url: string ]: number } = {} | |||
private constructor () {} | |||
static get Instance () { | |||
return this.instance || (this.instance = new this()) | |||
} | |||
updateActorFollowsScore (goodInboxes: string[], badInboxes: string[]) { | |||
if (goodInboxes.length === 0 && badInboxes.length === 0) return | |||
logger.info('Updating %d good actor follows and %d bad actor follows scores in cache.', goodInboxes.length, badInboxes.length) | |||
for (const goodInbox of goodInboxes) { | |||
if (this.pendingFollowsScore[goodInbox] === undefined) this.pendingFollowsScore[goodInbox] = 0 | |||
this.pendingFollowsScore[goodInbox] += ACTOR_FOLLOW_SCORE.BONUS | |||
} | |||
for (const badInbox of badInboxes) { | |||
if (this.pendingFollowsScore[badInbox] === undefined) this.pendingFollowsScore[badInbox] = 0 | |||
this.pendingFollowsScore[badInbox] += ACTOR_FOLLOW_SCORE.PENALTY | |||
} | |||
} | |||
getPendingFollowsScoreCopy () { | |||
return this.pendingFollowsScore | |||
} | |||
clearPendingFollowsScore () { | |||
this.pendingFollowsScore = {} | |||
} | |||
} | |||
export { | |||
ActorFollowScoreCache | |||
} |
@@ -1,2 +1,3 @@ | |||
export * from './actor-follow-score-cache' | |||
export * from './videos-preview-cache' | |||
export * from './videos-caption-cache' |
@@ -5,6 +5,7 @@ import { doRequest } from '../../../helpers/requests' | |||
import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | |||
import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | |||
import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers' | |||
import { ActorFollowScoreCache } from '../../cache' | |||
export type ActivitypubHttpBroadcastPayload = { | |||
uris: string[] | |||
@@ -38,7 +39,7 @@ async function processActivityPubHttpBroadcast (job: Bull.Job) { | |||
.catch(() => badUrls.push(uri)) | |||
}, { concurrency: BROADCAST_CONCURRENCY }) | |||
return ActorFollowModel.updateActorFollowsScore(goodUrls, badUrls, undefined) | |||
return ActorFollowScoreCache.Instance.updateActorFollowsScore(goodUrls, badUrls) | |||
} | |||
// --------------------------------------------------------------------------- | |||
@@ -1,9 +1,9 @@ | |||
import * as Bull from 'bull' | |||
import { logger } from '../../../helpers/logger' | |||
import { doRequest } from '../../../helpers/requests' | |||
import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | |||
import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | |||
import { JOB_REQUEST_TIMEOUT } from '../../../initializers' | |||
import { ActorFollowScoreCache } from '../../cache' | |||
export type ActivitypubHttpUnicastPayload = { | |||
uri: string | |||
@@ -31,9 +31,9 @@ async function processActivityPubHttpUnicast (job: Bull.Job) { | |||
try { | |||
await doRequest(options) | |||
ActorFollowModel.updateActorFollowsScore([ uri ], [], undefined) | |||
ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], []) | |||
} catch (err) { | |||
ActorFollowModel.updateActorFollowsScore([], [ uri ], undefined) | |||
ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ]) | |||
throw err | |||
} | |||
@@ -165,10 +165,10 @@ class JobQueue { | |||
return total | |||
} | |||
removeOldJobs () { | |||
async removeOldJobs () { | |||
for (const key of Object.keys(this.queues)) { | |||
const queue = this.queues[key] | |||
queue.clean(JOB_COMPLETED_LIFETIME, 'completed') | |||
await queue.clean(JOB_COMPLETED_LIFETIME, 'completed') | |||
} | |||
} | |||
@@ -1,8 +1,11 @@ | |||
import { logger } from '../../helpers/logger' | |||
export abstract class AbstractScheduler { | |||
protected abstract schedulerIntervalMs: number | |||
private interval: NodeJS.Timer | |||
private isRunning = false | |||
enable () { | |||
if (!this.schedulerIntervalMs) throw new Error('Interval is not correctly set.') | |||
@@ -14,5 +17,18 @@ export abstract class AbstractScheduler { | |||
clearInterval(this.interval) | |||
} | |||
abstract execute () | |||
async execute () { | |||
if (this.isRunning === true) return | |||
this.isRunning = true | |||
try { | |||
await this.internalExecute() | |||
} catch (err) { | |||
logger.error('Cannot execute %s scheduler.', this.constructor.name, { err }) | |||
} finally { | |||
this.isRunning = false | |||
} | |||
} | |||
protected abstract internalExecute (): Promise<any> | |||
} |
@@ -3,18 +3,35 @@ import { logger } from '../../helpers/logger' | |||
import { ActorFollowModel } from '../../models/activitypub/actor-follow' | |||
import { AbstractScheduler } from './abstract-scheduler' | |||
import { SCHEDULER_INTERVALS_MS } from '../../initializers' | |||
import { ActorFollowScoreCache } from '../cache' | |||
export class BadActorFollowScheduler extends AbstractScheduler { | |||
export class ActorFollowScheduler extends AbstractScheduler { | |||
private static instance: AbstractScheduler | |||
protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.badActorFollow | |||
protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.actorFollowScores | |||
private constructor () { | |||
super() | |||
} | |||
async execute () { | |||
protected async internalExecute () { | |||
await this.processPendingScores() | |||
await this.removeBadActorFollows() | |||
} | |||
private async processPendingScores () { | |||
const pendingScores = ActorFollowScoreCache.Instance.getPendingFollowsScoreCopy() | |||
ActorFollowScoreCache.Instance.clearPendingFollowsScore() | |||
for (const inbox of Object.keys(pendingScores)) { | |||
await ActorFollowModel.updateFollowScore(inbox, pendingScores[inbox]) | |||
} | |||
} | |||
private async removeBadActorFollows () { | |||
if (!isTestInstance()) logger.info('Removing bad actor follows (scheduler).') | |||
try { |
@@ -14,10 +14,10 @@ export class RemoveOldJobsScheduler extends AbstractScheduler { | |||
super() | |||
} | |||
async execute () { | |||
if (!isTestInstance()) logger.info('Removing old jobs (scheduler).') | |||
protected internalExecute () { | |||
if (!isTestInstance()) logger.info('Removing old jobs in scheduler.') | |||
JobQueue.Instance.removeOldJobs() | |||
return JobQueue.Instance.removeOldJobs() | |||
} | |||
static get Instance () { | |||
@@ -12,23 +12,12 @@ export class UpdateVideosScheduler extends AbstractScheduler { | |||
protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.updateVideos | |||
private isRunning = false | |||
private constructor () { | |||
super() | |||
} | |||
async execute () { | |||
if (this.isRunning === true) return | |||
this.isRunning = true | |||
try { | |||
await retryTransactionWrapper(this.updateVideos.bind(this)) | |||
} catch (err) { | |||
logger.error('Cannot execute update videos scheduler.', { err }) | |||
} finally { | |||
this.isRunning = false | |||
} | |||
protected async internalExecute () { | |||
return retryTransactionWrapper(this.updateVideos.bind(this)) | |||
} | |||
private async updateVideos () { | |||
@@ -16,7 +16,6 @@ import { getOrCreateVideoAndAccountAndChannel } from '../activitypub' | |||
export class VideosRedundancyScheduler extends AbstractScheduler { | |||
private static instance: AbstractScheduler | |||
private executing = false | |||
protected schedulerIntervalMs = CONFIG.REDUNDANCY.VIDEOS.CHECK_INTERVAL | |||
@@ -24,11 +23,7 @@ export class VideosRedundancyScheduler extends AbstractScheduler { | |||
super() | |||
} | |||
async execute () { | |||
if (this.executing) return | |||
this.executing = true | |||
protected async internalExecute () { | |||
for (const obj of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) { | |||
logger.info('Running redundancy scheduler for strategy %s.', obj.strategy) | |||
@@ -57,8 +52,6 @@ export class VideosRedundancyScheduler extends AbstractScheduler { | |||
await this.extendsLocalExpiration() | |||
await this.purgeRemoteExpired() | |||
this.executing = false | |||
} | |||
static get Instance () { | |||
@@ -12,7 +12,7 @@ export class YoutubeDlUpdateScheduler extends AbstractScheduler { | |||
super() | |||
} | |||
execute () { | |||
protected internalExecute () { | |||
return updateYoutubeDLBinary() | |||
} | |||
@@ -127,22 +127,6 @@ export class ActorFollowModel extends Model<ActorFollowModel> { | |||
if (numberOfActorFollowsRemoved) logger.info('Removed bad %d actor follows.', numberOfActorFollowsRemoved) | |||
} | |||
static updateActorFollowsScore (goodInboxes: string[], badInboxes: string[], t: Sequelize.Transaction | undefined) { | |||
if (goodInboxes.length === 0 && badInboxes.length === 0) return | |||
logger.info('Updating %d good actor follows and %d bad actor follows scores.', goodInboxes.length, badInboxes.length) | |||
if (goodInboxes.length !== 0) { | |||
ActorFollowModel.incrementScores(goodInboxes, ACTOR_FOLLOW_SCORE.BONUS, t) | |||
.catch(err => logger.error('Cannot increment scores of good actor follows.', { err })) | |||
} | |||
if (badInboxes.length !== 0) { | |||
ActorFollowModel.incrementScores(badInboxes, ACTOR_FOLLOW_SCORE.PENALTY, t) | |||
.catch(err => logger.error('Cannot decrement scores of bad actor follows.', { err })) | |||
} | |||
} | |||
static loadByActorAndTarget (actorId: number, targetActorId: number, t?: Sequelize.Transaction) { | |||
const query = { | |||
where: { | |||
@@ -464,6 +448,22 @@ export class ActorFollowModel extends Model<ActorFollowModel> { | |||
} | |||
} | |||
static updateFollowScore (inboxUrl: string, value: number, t?: Sequelize.Transaction) { | |||
const query = `UPDATE "actorFollow" SET "score" = LEAST("score" + ${value}, ${ACTOR_FOLLOW_SCORE.MAX}) ` + | |||
'WHERE id IN (' + | |||
'SELECT "actorFollow"."id" FROM "actorFollow" ' + | |||
'INNER JOIN "actor" ON "actor"."id" = "actorFollow"."actorId" ' + | |||
`WHERE "actor"."inboxUrl" = '${inboxUrl}' OR "actor"."sharedInboxUrl" = '${inboxUrl}'` + | |||
')' | |||
const options = { | |||
type: Sequelize.QueryTypes.BULKUPDATE, | |||
transaction: t | |||
} | |||
return ActorFollowModel.sequelize.query(query, options) | |||
} | |||
private static async createListAcceptedFollowForApiQuery ( | |||
type: 'followers' | 'following', | |||
actorIds: number[], | |||
@@ -518,24 +518,6 @@ export class ActorFollowModel extends Model<ActorFollowModel> { | |||
} | |||
} | |||
private static incrementScores (inboxUrls: string[], value: number, t: Sequelize.Transaction | undefined) { | |||
const inboxUrlsString = inboxUrls.map(url => `'${url}'`).join(',') | |||
const query = `UPDATE "actorFollow" SET "score" = LEAST("score" + ${value}, ${ACTOR_FOLLOW_SCORE.MAX}) ` + | |||
'WHERE id IN (' + | |||
'SELECT "actorFollow"."id" FROM "actorFollow" ' + | |||
'INNER JOIN "actor" ON "actor"."id" = "actorFollow"."actorId" ' + | |||
'WHERE "actor"."inboxUrl" IN (' + inboxUrlsString + ') OR "actor"."sharedInboxUrl" IN (' + inboxUrlsString + ')' + | |||
')' | |||
const options = t ? { | |||
type: Sequelize.QueryTypes.BULKUPDATE, | |||
transaction: t | |||
} : undefined | |||
return ActorFollowModel.sequelize.query(query, options) | |||
} | |||
private static listBadActorFollows () { | |||
const query = { | |||
where: { | |||