@@ -11,6 +11,6 @@ | |||
* **What do you see instead?** | |||
* **Browser console log if useful (Gist/Pastebin...):** | |||
* **Server log if useful (Gist/Pastebin...):** | |||
* **Browser console log if useful:** | |||
* **Server log if useful (journalctl or /var/www/peertube/storage/logs):** | |||
@@ -380,14 +380,16 @@ async function viewVideo (req: express.Request, res: express.Response) { | |||
const videoInstance = res.locals.video | |||
const ip = req.ip | |||
const exists = await Redis.Instance.isViewExists(ip, videoInstance.uuid) | |||
const exists = await Redis.Instance.isVideoIPViewExists(ip, videoInstance.uuid) | |||
if (exists) { | |||
logger.debug('View for ip %s and video %s already exists.', ip, videoInstance.uuid) | |||
return res.status(204).end() | |||
} | |||
await videoInstance.increment('views') | |||
await Redis.Instance.setView(ip, videoInstance.uuid) | |||
await Promise.all([ | |||
Redis.Instance.addVideoView(videoInstance.id), | |||
Redis.Instance.setIPVideoView(ip, videoInstance.uuid) | |||
]) | |||
const serverAccount = await getServerActor() | |||
@@ -3,11 +3,12 @@ import { dirname, join } from 'path' | |||
import { JobType, VideoRateType, VideoState } from '../../shared/models' | |||
import { ActivityPubActorType } from '../../shared/models/activitypub' | |||
import { FollowState } from '../../shared/models/actors' | |||
import { VideoPrivacy, VideoAbuseState, VideoImportState } from '../../shared/models/videos' | |||
import { VideoAbuseState, VideoImportState, VideoPrivacy } from '../../shared/models/videos' | |||
// Do not use barrels, remain constants as independent as possible | |||
import { buildPath, isTestInstance, root, sanitizeHost, sanitizeUrl } from '../helpers/core-utils' | |||
import { NSFWPolicyType } from '../../shared/models/videos/nsfw-policy.type' | |||
import { invert } from 'lodash' | |||
import { CronRepeatOptions, EveryRepeatOptions } from 'bull' | |||
// Use a variable to reload the configuration if we need | |||
let config: IConfig = require('config') | |||
@@ -90,7 +91,8 @@ const JOB_ATTEMPTS: { [ id in JobType ]: number } = { | |||
'video-file-import': 1, | |||
'video-file': 1, | |||
'video-import': 1, | |||
'email': 5 | |||
'email': 5, | |||
'videos-views': 1 | |||
} | |||
const JOB_CONCURRENCY: { [ id in JobType ]: number } = { | |||
'activitypub-http-broadcast': 1, | |||
@@ -100,7 +102,8 @@ const JOB_CONCURRENCY: { [ id in JobType ]: number } = { | |||
'video-file-import': 1, | |||
'video-file': 1, | |||
'video-import': 1, | |||
'email': 5 | |||
'email': 5, | |||
'videos-views': 1 | |||
} | |||
const JOB_TTL: { [ id in JobType ]: number } = { | |||
'activitypub-http-broadcast': 60000 * 10, // 10 minutes | |||
@@ -110,8 +113,15 @@ const JOB_TTL: { [ id in JobType ]: number } = { | |||
'video-file-import': 1000 * 3600, // 1 hour | |||
'video-file': 1000 * 3600 * 48, // 2 days, transcoding could be long | |||
'video-import': 1000 * 3600 * 5, // 5 hours | |||
'email': 60000 * 10 // 10 minutes | |||
'email': 60000 * 10, // 10 minutes | |||
'videos-views': undefined // Unlimited | |||
} | |||
const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = { | |||
'videos-views': { | |||
cron: '1 * * * *' // At 1 minutes past the hour | |||
} | |||
} | |||
const BROADCAST_CONCURRENCY = 10 // How many requests in parallel we do in activitypub-http-broadcast job | |||
const CRAWL_REQUEST_CONCURRENCY = 1 // How many requests in parallel to fetch remote data (likes, shares...) | |||
const JOB_REQUEST_TIMEOUT = 3000 // 3 seconds | |||
@@ -591,6 +601,7 @@ if (isTestInstance() === true) { | |||
SCHEDULER_INTERVALS_MS.badActorFollow = 10000 | |||
SCHEDULER_INTERVALS_MS.removeOldJobs = 10000 | |||
SCHEDULER_INTERVALS_MS.updateVideos = 5000 | |||
REPEAT_JOBS['videos-views'] = { every: 5000 } | |||
VIDEO_VIEW_LIFETIME = 1000 // 1 second | |||
@@ -652,6 +663,7 @@ export { | |||
USER_PASSWORD_RESET_LIFETIME, | |||
IMAGE_MIMETYPE_EXT, | |||
SCHEDULER_INTERVALS_MS, | |||
REPEAT_JOBS, | |||
STATIC_DOWNLOAD_PATHS, | |||
RATES_LIMIT, | |||
VIDEO_EXT_MIMETYPE, | |||
@@ -25,6 +25,7 @@ import { CONFIG } from './constants' | |||
import { ScheduleVideoUpdateModel } from '../models/video/schedule-video-update' | |||
import { VideoCaptionModel } from '../models/video/video-caption' | |||
import { VideoImportModel } from '../models/video/video-import' | |||
import { VideoViewModel } from '../models/video/video-views' | |||
require('pg').defaults.parseInt8 = true // Avoid BIGINT to be converted to string | |||
@@ -83,7 +84,8 @@ async function initDatabaseModels (silent: boolean) { | |||
VideoModel, | |||
VideoCommentModel, | |||
ScheduleVideoUpdateModel, | |||
VideoImportModel | |||
VideoImportModel, | |||
VideoViewModel | |||
]) | |||
// Check extensions exist in the database | |||
@@ -7,11 +7,11 @@ import { sequelizeTypescript } from '../../../initializers' | |||
import { AccountVideoRateModel } from '../../../models/account/account-video-rate' | |||
import { ActorModel } from '../../../models/activitypub/actor' | |||
import { VideoAbuseModel } from '../../../models/video/video-abuse' | |||
import { VideoCommentModel } from '../../../models/video/video-comment' | |||
import { getOrCreateActorAndServerAndModel } from '../actor' | |||
import { addVideoComment, resolveThread } from '../video-comments' | |||
import { getOrCreateVideoAndAccountAndChannel } from '../videos' | |||
import { forwardActivity, forwardVideoRelatedActivity } from '../send/utils' | |||
import { Redis } from '../../redis' | |||
async function processCreateActivity (activity: ActivityCreate) { | |||
const activityObject = activity.object | |||
@@ -88,7 +88,7 @@ async function processCreateView (byActor: ActorModel, activity: ActivityCreate) | |||
const actor = await ActorModel.loadByUrl(view.actor) | |||
if (!actor) throw new Error('Unknown actor ' + view.actor) | |||
await video.increment('views') | |||
await Redis.Instance.addVideoView(video.id) | |||
if (video.isOwned()) { | |||
// Don't resend the activity to the sender | |||
@@ -0,0 +1,40 @@ | |||
import { Redis } from '../../redis' | |||
import { logger } from '../../../helpers/logger' | |||
import { VideoModel } from '../../../models/video/video' | |||
import { VideoViewModel } from '../../../models/video/video-views' | |||
async function processVideosViewsViews () { | |||
const hour = new Date().getHours() | |||
const startDate = new Date().setMinutes(0, 0, 0) | |||
const endDate = new Date().setMinutes(59, 59, 999) | |||
const videoIds = await Redis.Instance.getVideosIdViewed(hour) | |||
if (videoIds.length === 0) return | |||
logger.info('Processing videos views in job for hour %d.', hour) | |||
for (const videoId of videoIds) { | |||
const views = await Redis.Instance.getVideoViews(videoId, hour) | |||
if (isNaN(views)) { | |||
logger.error('Cannot process videos views of video %s in hour %d: views number is NaN.', videoId, hour) | |||
} else { | |||
logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) | |||
await VideoModel.incrementViews(videoId, views) | |||
await VideoViewModel.create({ | |||
startDate, | |||
endDate, | |||
views, | |||
videoId | |||
}) | |||
} | |||
await Redis.Instance.deleteVideoViews(videoId, hour) | |||
} | |||
} | |||
// --------------------------------------------------------------------------- | |||
export { | |||
processVideosViewsViews | |||
} |
@@ -2,7 +2,7 @@ import * as Bull from 'bull' | |||
import { JobState, JobType } from '../../../shared/models' | |||
import { logger } from '../../helpers/logger' | |||
import { Redis } from '../redis' | |||
import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL } from '../../initializers' | |||
import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS } from '../../initializers' | |||
import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' | |||
import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' | |||
import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' | |||
@@ -10,6 +10,7 @@ import { EmailPayload, processEmail } from './handlers/email' | |||
import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' | |||
import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' | |||
import { processVideoImport, VideoImportPayload } from './handlers/video-import' | |||
import { processVideosViewsViews } from './handlers/video-views' | |||
type CreateJobArgument = | |||
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | |||
@@ -19,7 +20,8 @@ type CreateJobArgument = | |||
{ type: 'video-file-import', payload: VideoFileImportPayload } | | |||
{ type: 'video-file', payload: VideoFilePayload } | | |||
{ type: 'email', payload: EmailPayload } | | |||
{ type: 'video-import', payload: VideoImportPayload } | |||
{ type: 'video-import', payload: VideoImportPayload } | | |||
{ type: 'videos-views', payload: {} } | |||
const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { | |||
'activitypub-http-broadcast': processActivityPubHttpBroadcast, | |||
@@ -29,7 +31,8 @@ const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { | |||
'video-file-import': processVideoFileImport, | |||
'video-file': processVideoFile, | |||
'email': processEmail, | |||
'video-import': processVideoImport | |||
'video-import': processVideoImport, | |||
'videos-views': processVideosViewsViews | |||
} | |||
const jobTypes: JobType[] = [ | |||
@@ -40,7 +43,8 @@ const jobTypes: JobType[] = [ | |||
'email', | |||
'video-file', | |||
'video-file-import', | |||
'video-import' | |||
'video-import', | |||
'videos-views' | |||
] | |||
class JobQueue { | |||
@@ -85,6 +89,8 @@ class JobQueue { | |||
this.queues[handlerName] = queue | |||
} | |||
this.addRepeatableJobs() | |||
} | |||
terminate () { | |||
@@ -163,6 +169,12 @@ class JobQueue { | |||
} | |||
} | |||
private addRepeatableJobs () { | |||
this.queues['videos-views'].add({}, { | |||
repeat: REPEAT_JOBS['videos-views'] | |||
}) | |||
} | |||
static get Instance () { | |||
return this.instance || (this.instance = new this()) | |||
} | |||
@@ -60,11 +60,11 @@ class Redis { | |||
return this.getValue(this.generateResetPasswordKey(userId)) | |||
} | |||
setView (ip: string, videoUUID: string) { | |||
setIPVideoView (ip: string, videoUUID: string) { | |||
return this.setValue(this.buildViewKey(ip, videoUUID), '1', VIDEO_VIEW_LIFETIME) | |||
} | |||
async isViewExists (ip: string, videoUUID: string) { | |||
async isVideoIPViewExists (ip: string, videoUUID: string) { | |||
return this.exists(this.buildViewKey(ip, videoUUID)) | |||
} | |||
@@ -85,6 +85,52 @@ class Redis { | |||
return this.setObject(this.buildCachedRouteKey(req), cached, lifetime) | |||
} | |||
addVideoView (videoId: number) { | |||
const keyIncr = this.generateVideoViewKey(videoId) | |||
const keySet = this.generateVideosViewKey() | |||
return Promise.all([ | |||
this.addToSet(keySet, videoId.toString()), | |||
this.increment(keyIncr) | |||
]) | |||
} | |||
async getVideoViews (videoId: number, hour: number) { | |||
const key = this.generateVideoViewKey(videoId, hour) | |||
const valueString = await this.getValue(key) | |||
return parseInt(valueString, 10) | |||
} | |||
async getVideosIdViewed (hour: number) { | |||
const key = this.generateVideosViewKey(hour) | |||
const stringIds = await this.getSet(key) | |||
return stringIds.map(s => parseInt(s, 10)) | |||
} | |||
deleteVideoViews (videoId: number, hour: number) { | |||
const keySet = this.generateVideosViewKey(hour) | |||
const keyIncr = this.generateVideoViewKey(videoId, hour) | |||
return Promise.all([ | |||
this.deleteFromSet(keySet, videoId.toString()), | |||
this.deleteKey(keyIncr) | |||
]) | |||
} | |||
generateVideosViewKey (hour?: number) { | |||
if (!hour) hour = new Date().getHours() | |||
return `videos-view-h${hour}` | |||
} | |||
generateVideoViewKey (videoId: number, hour?: number) { | |||
if (!hour) hour = new Date().getHours() | |||
return `video-view-${videoId}-h${hour}` | |||
} | |||
generateResetPasswordKey (userId: number) { | |||
return 'reset-password-' + userId | |||
} | |||
@@ -107,6 +153,34 @@ class Redis { | |||
}) | |||
} | |||
private getSet (key: string) { | |||
return new Promise<string[]>((res, rej) => { | |||
this.client.smembers(this.prefix + key, (err, value) => { | |||
if (err) return rej(err) | |||
return res(value) | |||
}) | |||
}) | |||
} | |||
private addToSet (key: string, value: string) { | |||
return new Promise<string[]>((res, rej) => { | |||
this.client.sadd(this.prefix + key, value, err => err ? rej(err) : res()) | |||
}) | |||
} | |||
private deleteFromSet (key: string, value: string) { | |||
return new Promise<void>((res, rej) => { | |||
this.client.srem(this.prefix + key, value, err => err ? rej(err) : res()) | |||
}) | |||
} | |||
private deleteKey (key: string) { | |||
return new Promise<void>((res, rej) => { | |||
this.client.del(this.prefix + key, err => err ? rej(err) : res()) | |||
}) | |||
} | |||
private setValue (key: string, value: string, expirationMilliseconds: number) { | |||
return new Promise<void>((res, rej) => { | |||
this.client.set(this.prefix + key, value, 'PX', expirationMilliseconds, (err, ok) => { | |||
@@ -145,6 +219,16 @@ class Redis { | |||
}) | |||
} | |||
private increment (key: string) { | |||
return new Promise<number>((res, rej) => { | |||
this.client.incr(this.prefix + key, (err, value) => { | |||
if (err) return rej(err) | |||
return res(value) | |||
}) | |||
}) | |||
} | |||
private exists (key: string) { | |||
return new Promise<boolean>((res, rej) => { | |||
this.client.exists(this.prefix + key, (err, existsNumber) => { | |||
@@ -0,0 +1,41 @@ | |||
import { AllowNull, BelongsTo, Column, CreatedAt, ForeignKey, Model, Table } from 'sequelize-typescript' | |||
import { VideoModel } from './video' | |||
import * as Sequelize from 'sequelize' | |||
@Table({ | |||
tableName: 'videoView', | |||
indexes: [ | |||
{ | |||
fields: [ 'videoId' ] | |||
} | |||
] | |||
}) | |||
export class VideoViewModel extends Model<VideoViewModel> { | |||
@CreatedAt | |||
createdAt: Date | |||
@AllowNull(false) | |||
@Column(Sequelize.DATE) | |||
startDate: Date | |||
@AllowNull(false) | |||
@Column(Sequelize.DATE) | |||
endDate: Date | |||
@AllowNull(false) | |||
@Column | |||
views: number | |||
@ForeignKey(() => VideoModel) | |||
@Column | |||
videoId: number | |||
@BelongsTo(() => VideoModel, { | |||
foreignKey: { | |||
allowNull: false | |||
}, | |||
onDelete: 'CASCADE' | |||
}) | |||
Video: VideoModel | |||
} |
@@ -1074,6 +1074,15 @@ export class VideoModel extends Model<VideoModel> { | |||
} | |||
} | |||
static incrementViews (id: number, views: number) { | |||
return VideoModel.increment('views', { | |||
by: views, | |||
where: { | |||
id | |||
} | |||
}) | |||
} | |||
private static buildActorWhereWithFilter (filter?: VideoFilter) { | |||
if (filter && filter === 'local') { | |||
return { | |||
@@ -4,7 +4,18 @@ import 'mocha' | |||
import * as chai from 'chai' | |||
import { About } from '../../../../shared/models/server/about.model' | |||
import { CustomConfig } from '../../../../shared/models/server/custom-config.model' | |||
import { deleteCustomConfig, getAbout, getVideo, killallServers, login, reRunServer, uploadVideo, userLogin, viewVideo } from '../../utils' | |||
import { | |||
deleteCustomConfig, | |||
getAbout, | |||
getVideo, | |||
killallServers, | |||
login, | |||
reRunServer, | |||
uploadVideo, | |||
userLogin, | |||
viewVideo, | |||
wait | |||
} from '../../utils' | |||
const expect = chai.expect | |||
import { | |||
@@ -30,33 +41,53 @@ describe('Test application behind a reverse proxy', function () { | |||
}) | |||
it('Should view a video only once with the same IP by default', async function () { | |||
this.timeout(20000) | |||
await viewVideo(server.url, videoId) | |||
await viewVideo(server.url, videoId) | |||
// Wait the repeatable job | |||
await wait(8000) | |||
const { body } = await getVideo(server.url, videoId) | |||
expect(body.views).to.equal(1) | |||
}) | |||
it('Should view a video 2 times with the X-Forwarded-For header set', async function () { | |||
this.timeout(20000) | |||
await viewVideo(server.url, videoId, 204, '0.0.0.1,127.0.0.1') | |||
await viewVideo(server.url, videoId, 204, '0.0.0.2,127.0.0.1') | |||
// Wait the repeatable job | |||
await wait(8000) | |||
const { body } = await getVideo(server.url, videoId) | |||
expect(body.views).to.equal(3) | |||
}) | |||
it('Should view a video only once with the same client IP in the X-Forwarded-For header', async function () { | |||
this.timeout(20000) | |||
await viewVideo(server.url, videoId, 204, '0.0.0.4,0.0.0.3,::ffff:127.0.0.1') | |||
await viewVideo(server.url, videoId, 204, '0.0.0.5,0.0.0.3,127.0.0.1') | |||
// Wait the repeatable job | |||
await wait(8000) | |||
const { body } = await getVideo(server.url, videoId) | |||
expect(body.views).to.equal(4) | |||
}) | |||
it('Should view a video two times with a different client IP in the X-Forwarded-For header', async function () { | |||
this.timeout(20000) | |||
await viewVideo(server.url, videoId, 204, '0.0.0.8,0.0.0.6,127.0.0.1') | |||
await viewVideo(server.url, videoId, 204, '0.0.0.8,0.0.0.7,127.0.0.1') | |||
// Wait the repeatable job | |||
await wait(8000) | |||
const { body } = await getVideo(server.url, videoId) | |||
expect(body.views).to.equal(6) | |||
}) | |||
@@ -46,6 +46,9 @@ describe('Test stats', function () { | |||
await viewVideo(servers[0].url, videoUUID) | |||
// Wait the video views repeatable job | |||
await wait(8000) | |||
await follow(servers[2].url, [ servers[0].url ], servers[2].accessToken) | |||
await waitJobs(servers) | |||
}) | |||
@@ -492,7 +492,7 @@ describe('Test multiple servers', function () { | |||
}) | |||
it('Should view multiple videos on owned servers', async function () { | |||
this.timeout(15000) | |||
this.timeout(30000) | |||
const tasks: Promise<any>[] = [] | |||
await viewVideo(servers[2].url, localVideosServer3[0]) | |||
@@ -511,6 +511,9 @@ describe('Test multiple servers', function () { | |||
await waitJobs(servers) | |||
// Wait the repeatable job | |||
await wait(6000) | |||
for (const server of servers) { | |||
const res = await getVideosList(server.url) | |||
@@ -524,7 +527,7 @@ describe('Test multiple servers', function () { | |||
}) | |||
it('Should view multiple videos on each servers', async function () { | |||
this.timeout(15000) | |||
this.timeout(30000) | |||
const tasks: Promise<any>[] = [] | |||
tasks.push(viewVideo(servers[0].url, remoteVideosServer1[0])) | |||
@@ -542,6 +545,9 @@ describe('Test multiple servers', function () { | |||
await waitJobs(servers) | |||
// Wait the repeatable job | |||
await wait(8000) | |||
let baseVideos = null | |||
for (const server of servers) { | |||
@@ -196,7 +196,7 @@ describe('Test a single server', function () { | |||
}) | |||
it('Should have the views updated', async function () { | |||
this.timeout(10000) | |||
this.timeout(20000) | |||
await viewVideo(server.url, videoId) | |||
await viewVideo(server.url, videoId) | |||
@@ -212,6 +212,9 @@ describe('Test a single server', function () { | |||
await viewVideo(server.url, videoId) | |||
await viewVideo(server.url, videoId) | |||
// Wait the repeatable job | |||
await wait(8000) | |||
const res = await getVideo(server.url, videoId) | |||
const video = res.body | |||
@@ -1,5 +1,5 @@ | |||
import * as request from 'supertest' | |||
import { JobState } from '../../../../shared/models' | |||
import { Job, JobState } from '../../../../shared/models' | |||
import { ServerInfo, wait } from '../index' | |||
function getJobsList (url: string, accessToken: string, state: JobState) { | |||
@@ -44,8 +44,10 @@ async function waitJobs (serversArg: ServerInfo[] | ServerInfo) { | |||
for (const server of servers) { | |||
for (const state of states) { | |||
const p = getJobsListPaginationAndSort(server.url, server.accessToken, state, 0, 10, '-createdAt') | |||
.then(res => { | |||
if (res.body.total > 0) pendingRequests = true | |||
.then(res => res.body.data) | |||
.then((jobs: Job[]) => jobs.filter(j => j.type !== 'videos-views')) | |||
.then(jobs => { | |||
if (jobs.length !== 0) pendingRequests = true | |||
}) | |||
tasks.push(p) | |||
} | |||
@@ -7,7 +7,8 @@ export type JobType = 'activitypub-http-unicast' | | |||
'video-file-import' | | |||
'video-file' | | |||
'email' | | |||
'video-import' | |||
'video-import' | | |||
'videos-views' | |||
export interface Job { | |||
id: number | |||