@@ -387,6 +387,11 @@ async function updateVideo (req: express.Request, res: express.Response) { | |||
function getVideo (req: express.Request, res: express.Response) { | |||
const videoInstance = res.locals.video | |||
if (videoInstance.isOutdated()) { | |||
JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video', videoUrl: videoInstance.url } }) | |||
.catch(err => logger.error('Cannot create AP refresher job for video %s.', videoInstance.url, { err })) | |||
} | |||
return res.json(videoInstance.toFormattedDetailsJSON()) | |||
} | |||
@@ -429,7 +434,7 @@ async function getVideoDescription (req: express.Request, res: express.Response) | |||
return res.json({ description }) | |||
} | |||
async function listVideos (req: express.Request, res: express.Response, next: express.NextFunction) { | |||
async function listVideos (req: express.Request, res: express.Response) { | |||
const resultList = await VideoModel.listForApi({ | |||
start: req.query.start, | |||
count: req.query.count, | |||
@@ -102,7 +102,8 @@ const JOB_ATTEMPTS: { [ id in JobType ]: number } = { | |||
'video-file': 1, | |||
'video-import': 1, | |||
'email': 5, | |||
'videos-views': 1 | |||
'videos-views': 1, | |||
'activitypub-refresher': 1 | |||
} | |||
const JOB_CONCURRENCY: { [ id in JobType ]: number } = { | |||
'activitypub-http-broadcast': 1, | |||
@@ -113,7 +114,8 @@ const JOB_CONCURRENCY: { [ id in JobType ]: number } = { | |||
'video-file': 1, | |||
'video-import': 1, | |||
'email': 5, | |||
'videos-views': 1 | |||
'videos-views': 1, | |||
'activitypub-refresher': 1 | |||
} | |||
const JOB_TTL: { [ id in JobType ]: number } = { | |||
'activitypub-http-broadcast': 60000 * 10, // 10 minutes | |||
@@ -124,11 +126,12 @@ const JOB_TTL: { [ id in JobType ]: number } = { | |||
'video-file': 1000 * 3600 * 48, // 2 days, transcoding could be long | |||
'video-import': 1000 * 3600 * 2, // hours | |||
'email': 60000 * 10, // 10 minutes | |||
'videos-views': undefined // Unlimited | |||
'videos-views': undefined, // Unlimited | |||
'activitypub-refresher': 60000 * 10 // 10 minutes | |||
} | |||
const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = { | |||
'videos-views': { | |||
cron: '1 * * * *' // At 1 minutes past the hour | |||
cron: '1 * * * *' // At 1 minute past the hour | |||
} | |||
} | |||
@@ -543,7 +546,7 @@ const HTTP_SIGNATURE = { | |||
// --------------------------------------------------------------------------- | |||
const PRIVATE_RSA_KEY_SIZE = 2048 | |||
let PRIVATE_RSA_KEY_SIZE = 2048 | |||
// Password encryption | |||
const BCRYPT_SALT_SIZE = 10 | |||
@@ -647,6 +650,8 @@ const TRACKER_RATE_LIMITS = { | |||
// Special constants for a test instance | |||
if (isTestInstance() === true) { | |||
PRIVATE_RSA_KEY_SIZE = 1024 | |||
ACTOR_FOLLOW_SCORE.BASE = 20 | |||
REMOTE_SCHEME.HTTP = 'http' | |||
@@ -59,7 +59,6 @@ async function processUpdateVideo (actor: ActorModel, activity: ActivityUpdate) | |||
videoObject, | |||
account: actor.Account, | |||
channel: channelActor.VideoChannel, | |||
updateViews: true, | |||
overrideTo: activity.to | |||
} | |||
return updateVideoFromAP(updateOptions) | |||
@@ -117,7 +117,7 @@ type SyncParam = { | |||
shares: boolean | |||
comments: boolean | |||
thumbnail: boolean | |||
refreshVideo: boolean | |||
refreshVideo?: boolean | |||
} | |||
async function syncVideoExternalAttributes (video: VideoModel, fetchedVideo: VideoTorrentObject, syncParam: SyncParam) { | |||
logger.info('Adding likes/dislikes/shares/comments of video %s.', video.uuid) | |||
@@ -158,13 +158,11 @@ async function syncVideoExternalAttributes (video: VideoModel, fetchedVideo: Vid | |||
async function getOrCreateVideoAndAccountAndChannel (options: { | |||
videoObject: VideoTorrentObject | string, | |||
syncParam?: SyncParam, | |||
fetchType?: VideoFetchByUrlType, | |||
refreshViews?: boolean | |||
fetchType?: VideoFetchByUrlType | |||
}) { | |||
// Default params | |||
const syncParam = options.syncParam || { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true, refreshVideo: false } | |||
const fetchType = options.fetchType || 'all' | |||
const refreshViews = options.refreshViews || false | |||
// Get video url | |||
const videoUrl = getAPUrl(options.videoObject) | |||
@@ -174,11 +172,11 @@ async function getOrCreateVideoAndAccountAndChannel (options: { | |||
const refreshOptions = { | |||
video: videoFromDatabase, | |||
fetchedType: fetchType, | |||
syncParam, | |||
refreshViews | |||
syncParam | |||
} | |||
const p = refreshVideoIfNeeded(refreshOptions) | |||
if (syncParam.refreshVideo === true) videoFromDatabase = await p | |||
if (syncParam.refreshVideo === true) videoFromDatabase = await refreshVideoIfNeeded(refreshOptions) | |||
else await JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video', videoUrl: videoFromDatabase.url } }) | |||
return { video: videoFromDatabase } | |||
} | |||
@@ -199,7 +197,6 @@ async function updateVideoFromAP (options: { | |||
videoObject: VideoTorrentObject, | |||
account: AccountModel, | |||
channel: VideoChannelModel, | |||
updateViews: boolean, | |||
overrideTo?: string[] | |||
}) { | |||
logger.debug('Updating remote video "%s".', options.videoObject.uuid) | |||
@@ -238,8 +235,8 @@ async function updateVideoFromAP (options: { | |||
options.video.set('publishedAt', videoData.publishedAt) | |||
options.video.set('privacy', videoData.privacy) | |||
options.video.set('channelId', videoData.channelId) | |||
options.video.set('views', videoData.views) | |||
if (options.updateViews === true) options.video.set('views', videoData.views) | |||
await options.video.save(sequelizeOptions) | |||
{ | |||
@@ -297,8 +294,58 @@ async function updateVideoFromAP (options: { | |||
} | |||
} | |||
async function refreshVideoIfNeeded (options: { | |||
video: VideoModel, | |||
fetchedType: VideoFetchByUrlType, | |||
syncParam: SyncParam | |||
}): Promise<VideoModel> { | |||
if (!options.video.isOutdated()) return options.video | |||
// We need more attributes if the argument video was fetched with not enough joints | |||
const video = options.fetchedType === 'all' ? options.video : await VideoModel.loadByUrlAndPopulateAccount(options.video.url) | |||
try { | |||
const { response, videoObject } = await fetchRemoteVideo(video.url) | |||
if (response.statusCode === 404) { | |||
logger.info('Cannot refresh remote video %s: video does not exist anymore. Deleting it.', video.url) | |||
// Video does not exist anymore | |||
await video.destroy() | |||
return undefined | |||
} | |||
if (videoObject === undefined) { | |||
logger.warn('Cannot refresh remote video %s: invalid body.', video.url) | |||
await video.setAsRefreshed() | |||
return video | |||
} | |||
const channelActor = await getOrCreateVideoChannelFromVideoObject(videoObject) | |||
const account = await AccountModel.load(channelActor.VideoChannel.accountId) | |||
const updateOptions = { | |||
video, | |||
videoObject, | |||
account, | |||
channel: channelActor.VideoChannel | |||
} | |||
await retryTransactionWrapper(updateVideoFromAP, updateOptions) | |||
await syncVideoExternalAttributes(video, videoObject, options.syncParam) | |||
return video | |||
} catch (err) { | |||
logger.warn('Cannot refresh video %s.', options.video.url, { err }) | |||
// Don't refresh in loop | |||
await video.setAsRefreshed() | |||
return video | |||
} | |||
} | |||
export { | |||
updateVideoFromAP, | |||
refreshVideoIfNeeded, | |||
federateVideoIfNeeded, | |||
fetchRemoteVideo, | |||
getOrCreateVideoAndAccountAndChannel, | |||
@@ -362,52 +409,6 @@ async function createVideo (videoObject: VideoTorrentObject, channelActor: Actor | |||
return videoCreated | |||
} | |||
async function refreshVideoIfNeeded (options: { | |||
video: VideoModel, | |||
fetchedType: VideoFetchByUrlType, | |||
syncParam: SyncParam, | |||
refreshViews: boolean | |||
}): Promise<VideoModel> { | |||
if (!options.video.isOutdated()) return options.video | |||
// We need more attributes if the argument video was fetched with not enough joints | |||
const video = options.fetchedType === 'all' ? options.video : await VideoModel.loadByUrlAndPopulateAccount(options.video.url) | |||
try { | |||
const { response, videoObject } = await fetchRemoteVideo(video.url) | |||
if (response.statusCode === 404) { | |||
logger.info('Cannot refresh remote video %s: video does not exist anymore. Deleting it.', video.url) | |||
// Video does not exist anymore | |||
await video.destroy() | |||
return undefined | |||
} | |||
if (videoObject === undefined) { | |||
logger.warn('Cannot refresh remote video %s: invalid body.', video.url) | |||
return video | |||
} | |||
const channelActor = await getOrCreateVideoChannelFromVideoObject(videoObject) | |||
const account = await AccountModel.load(channelActor.VideoChannel.accountId) | |||
const updateOptions = { | |||
video, | |||
videoObject, | |||
account, | |||
channel: channelActor.VideoChannel, | |||
updateViews: options.refreshViews | |||
} | |||
await retryTransactionWrapper(updateVideoFromAP, updateOptions) | |||
await syncVideoExternalAttributes(video, videoObject, options.syncParam) | |||
return video | |||
} catch (err) { | |||
logger.warn('Cannot refresh video %s.', options.video.url, { err }) | |||
return video | |||
} | |||
} | |||
async function videoActivityObjectToDBAttributes ( | |||
videoChannel: VideoChannelModel, | |||
videoObject: VideoTorrentObject, | |||
@@ -0,0 +1,40 @@ | |||
import * as Bull from 'bull' | |||
import { logger } from '../../../helpers/logger' | |||
import { fetchVideoByUrl } from '../../../helpers/video' | |||
import { refreshVideoIfNeeded } from '../../activitypub' | |||
export type RefreshPayload = { | |||
videoUrl: string | |||
type: 'video' | |||
} | |||
async function refreshAPObject (job: Bull.Job) { | |||
const payload = job.data as RefreshPayload | |||
logger.info('Processing AP refresher in job %d.', job.id) | |||
if (payload.type === 'video') return refreshAPVideo(payload.videoUrl) | |||
} | |||
// --------------------------------------------------------------------------- | |||
export { | |||
refreshAPObject | |||
} | |||
// --------------------------------------------------------------------------- | |||
async function refreshAPVideo (videoUrl: string) { | |||
const fetchType = 'all' as 'all' | |||
const syncParam = { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true } | |||
const videoFromDatabase = await fetchVideoByUrl(videoUrl, fetchType) | |||
if (videoFromDatabase) { | |||
const refreshOptions = { | |||
video: videoFromDatabase, | |||
fetchedType: fetchType, | |||
syncParam | |||
} | |||
await refreshVideoIfNeeded(refreshOptions) | |||
} | |||
} |
@@ -11,6 +11,7 @@ import { processVideoFile, processVideoFileImport, VideoFileImportPayload, Video | |||
import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' | |||
import { processVideoImport, VideoImportPayload } from './handlers/video-import' | |||
import { processVideosViews } from './handlers/video-views' | |||
import { refreshAPObject, RefreshPayload } from './handlers/activitypub-refresher' | |||
type CreateJobArgument = | |||
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | |||
@@ -21,6 +22,7 @@ type CreateJobArgument = | |||
{ type: 'video-file', payload: VideoFilePayload } | | |||
{ type: 'email', payload: EmailPayload } | | |||
{ type: 'video-import', payload: VideoImportPayload } | | |||
{ type: 'activitypub-refresher', payload: RefreshPayload } | | |||
{ type: 'videos-views', payload: {} } | |||
const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { | |||
@@ -32,7 +34,8 @@ const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { | |||
'video-file': processVideoFile, | |||
'email': processEmail, | |||
'video-import': processVideoImport, | |||
'videos-views': processVideosViews | |||
'videos-views': processVideosViews, | |||
'activitypub-refresher': refreshAPObject | |||
} | |||
const jobTypes: JobType[] = [ | |||
@@ -44,7 +47,8 @@ const jobTypes: JobType[] = [ | |||
'video-file', | |||
'video-file-import', | |||
'video-import', | |||
'videos-views' | |||
'videos-views', | |||
'activitypub-refresher' | |||
] | |||
class JobQueue { | |||
@@ -1561,6 +1561,12 @@ export class VideoModel extends Model<VideoModel> { | |||
(now - updatedAtTime) > ACTIVITY_PUB.VIDEO_REFRESH_INTERVAL | |||
} | |||
setAsRefreshed () { | |||
this.changed('updatedAt', true) | |||
return this.save() | |||
} | |||
getBaseUrls () { | |||
let baseUrlHttp | |||
let baseUrlWs | |||
@@ -1,4 +1,5 @@ | |||
import './client' | |||
import './fetch' | |||
import './helpers' | |||
import './refresher' | |||
import './security' |
@@ -0,0 +1,84 @@ | |||
/* tslint:disable:no-unused-expression */ | |||
import 'mocha' | |||
import { doubleFollow, getVideo, reRunServer } from '../../utils' | |||
import { flushAndRunMultipleServers, killallServers, ServerInfo, setAccessTokensToServers, uploadVideo, wait } from '../../utils/index' | |||
import { waitJobs } from '../../utils/server/jobs' | |||
import { setVideoField } from '../../utils/miscs/sql' | |||
describe('Test AP refresher', function () { | |||
let servers: ServerInfo[] = [] | |||
let videoUUID1: string | |||
let videoUUID2: string | |||
let videoUUID3: string | |||
before(async function () { | |||
this.timeout(30000) | |||
servers = await flushAndRunMultipleServers(2) | |||
// Get the access tokens | |||
await setAccessTokensToServers(servers) | |||
{ | |||
const res = await uploadVideo(servers[1].url, servers[1].accessToken, { name: 'video1' }) | |||
videoUUID1 = res.body.video.uuid | |||
} | |||
{ | |||
const res = await uploadVideo(servers[1].url, servers[1].accessToken, { name: 'video2' }) | |||
videoUUID2 = res.body.video.uuid | |||
} | |||
{ | |||
const res = await uploadVideo(servers[1].url, servers[1].accessToken, { name: 'video3' }) | |||
videoUUID3 = res.body.video.uuid | |||
} | |||
await doubleFollow(servers[0], servers[1]) | |||
}) | |||
it('Should remove a deleted remote video', async function () { | |||
this.timeout(60000) | |||
await wait(10000) | |||
// Change UUID so the remote server returns a 404 | |||
await setVideoField(2, videoUUID1, 'uuid', '304afe4f-39f9-4d49-8ed7-ac57b86b174f') | |||
await getVideo(servers[0].url, videoUUID1) | |||
await getVideo(servers[0].url, videoUUID2) | |||
await waitJobs(servers) | |||
await getVideo(servers[0].url, videoUUID1, 404) | |||
await getVideo(servers[0].url, videoUUID2, 200) | |||
}) | |||
it('Should not update a remote video if the remote instance is down', async function () { | |||
this.timeout(60000) | |||
killallServers([ servers[1] ]) | |||
await setVideoField(2, videoUUID3, 'uuid', '304afe4f-39f9-4d49-8ed7-ac57b86b174e') | |||
// Video will need a refresh | |||
await wait(10000) | |||
await getVideo(servers[0].url, videoUUID3) | |||
// The refresh should fail | |||
await waitJobs([ servers[0] ]) | |||
await reRunServer(servers[1]) | |||
// Should not refresh the video, even if the last refresh failed (to avoir a loop on dead instances) | |||
await getVideo(servers[0].url, videoUUID3) | |||
await waitJobs(servers) | |||
await getVideo(servers[0].url, videoUUID3, 200) | |||
}) | |||
after(async function () { | |||
killallServers(servers) | |||
}) | |||
}) |
@@ -8,7 +8,8 @@ export type JobType = 'activitypub-http-unicast' | | |||
'video-file' | | |||
'email' | | |||
'video-import' | | |||
'videos-views' | |||
'videos-views' | | |||
'activitypub-refresher' | |||
export interface Job { | |||
id: number | |||