* add migrate-to-object-storage-script closes #4467 * add migrate-to-unique-playlist-filenames script * fix(migrate-to-unique-playlist-filenames): update master/segments256 run updateMasterHLSPlaylist and updateSha256VODSegments after file rename. * Improve move to object storage scripts * PR remarks Co-authored-by: Chocobozzz <me@florianbigard.com>tags/v4.0.0-rc.1
@@ -47,6 +47,7 @@ | |||
"create-transcoding-job": "node ./dist/scripts/create-transcoding-job.js", | |||
"regenerate-thumbnails": "node ./dist/scripts/regenerate-thumbnails.js", | |||
"create-import-video-file-job": "node ./dist/scripts/create-import-video-file-job.js", | |||
"create-move-video-storage-job": "node ./dist/scripts/create-move-video-storage-job.js", | |||
"print-transcode-command": "node ./dist/scripts/print-transcode-command.js", | |||
"test": "bash ./scripts/test.sh", | |||
"help": "bash ./scripts/help.sh", | |||
@@ -47,7 +47,7 @@ async function run () { | |||
filePath: resolve(options.import) | |||
} | |||
JobQueue.Instance.init() | |||
JobQueue.Instance.init(true) | |||
await JobQueue.Instance.createJobWithPromise({ type: 'video-file-import', payload: dataInput }) | |||
console.log('Import job for video %s created.', video.uuid) | |||
} |
@@ -0,0 +1,86 @@ | |||
import { registerTSPaths } from '../server/helpers/register-ts-paths' | |||
registerTSPaths() | |||
import { program } from 'commander' | |||
import { VideoModel } from '@server/models/video/video' | |||
import { initDatabaseModels } from '@server/initializers/database' | |||
import { VideoStorage } from '@shared/models' | |||
import { moveToExternalStorageState } from '@server/lib/video-state' | |||
import { JobQueue } from '@server/lib/job-queue' | |||
import { CONFIG } from '@server/initializers/config' | |||
program | |||
.description('Move videos to another storage.') | |||
.option('-o, --to-object-storage', 'Move videos in object storage') | |||
.option('-v, --video [videoUUID]', 'Move a specific video') | |||
.option('-a, --all-videos', 'Migrate all videos') | |||
.parse(process.argv) | |||
const options = program.opts() | |||
if (!options['toObjectStorage']) { | |||
console.error('You need to choose where to send video files.') | |||
process.exit(-1) | |||
} | |||
if (!options['video'] && !options['allVideos']) { | |||
console.error('You need to choose which videos to move.') | |||
process.exit(-1) | |||
} | |||
if (options['toObjectStorage'] && !CONFIG.OBJECT_STORAGE.ENABLED) { | |||
console.error('Object storage is not enabled on this instance.') | |||
process.exit(-1) | |||
} | |||
run() | |||
.then(() => process.exit(0)) | |||
.catch(err => console.error(err)) | |||
async function run () { | |||
await initDatabaseModels(true) | |||
JobQueue.Instance.init(true) | |||
let ids: number[] = [] | |||
if (options['video']) { | |||
const video = await VideoModel.load(options['video']) | |||
if (!video) { | |||
console.error('Unknown video ' + options['video']) | |||
process.exit(-1) | |||
} | |||
if (video.remote === true) { | |||
console.error('Cannot process a remote video') | |||
process.exit(-1) | |||
} | |||
ids.push(video.id) | |||
} else { | |||
ids = await VideoModel.listLocalIds() | |||
} | |||
for (const id of ids) { | |||
const videoFull = await VideoModel.loadAndPopulateAccountAndServerAndTags(id) | |||
const files = videoFull.VideoFiles || [] | |||
const hls = videoFull.getHLSPlaylist() | |||
if (files.some(f => f.storage === VideoStorage.FILE_SYSTEM) || hls?.storage === VideoStorage.FILE_SYSTEM) { | |||
console.log('Processing video %s.', videoFull.name) | |||
const success = await moveToExternalStorageState(videoFull, false, undefined) | |||
if (!success) { | |||
console.error( | |||
'Cannot create move job for %s: job creation may have failed or there may be pending transcoding jobs for this video', | |||
videoFull.name | |||
) | |||
} | |||
} | |||
console.log(`Created move-to-object-storage job for ${videoFull.name}.`) | |||
} | |||
} |
@@ -91,7 +91,7 @@ async function run () { | |||
} | |||
} | |||
JobQueue.Instance.init() | |||
JobQueue.Instance.init(true) | |||
video.state = VideoState.TO_TRANSCODE | |||
await video.save() | |||
@@ -0,0 +1,107 @@ | |||
import { registerTSPaths } from '../../server/helpers/register-ts-paths' | |||
registerTSPaths() | |||
import { join } from 'path' | |||
import { JobQueue } from '@server/lib/job-queue' | |||
import { initDatabaseModels } from '../../server/initializers/database' | |||
import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getHlsResolutionPlaylistFilename } from '@server/lib/paths' | |||
import { VideoPathManager } from '@server/lib/video-path-manager' | |||
import { VideoModel } from '@server/models/video/video' | |||
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' | |||
import { move, readFile, writeFile } from 'fs-extra' | |||
import Bluebird from 'bluebird' | |||
import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' | |||
run() | |||
.then(() => process.exit(0)) | |||
.catch(err => { | |||
console.error(err) | |||
process.exit(-1) | |||
}) | |||
async function run () { | |||
console.log('Migrate old HLS paths to new format.') | |||
await initDatabaseModels(true) | |||
JobQueue.Instance.init(true) | |||
const ids = await VideoModel.listLocalIds() | |||
await Bluebird.map(ids, async id => { | |||
try { | |||
await processVideo(id) | |||
} catch (err) { | |||
console.error('Cannot process video %s.', { err }) | |||
} | |||
}, { concurrency: 5 }) | |||
console.log('Migration finished!') | |||
} | |||
async function processVideo (videoId: number) { | |||
const video = await VideoModel.loadWithFiles(videoId) | |||
const hls = video.getHLSPlaylist() | |||
if (!hls || hls.playlistFilename !== 'master.m3u8' || hls.VideoFiles.length === 0) { | |||
return | |||
} | |||
console.log(`Renaming HLS playlist files of video ${video.name}.`) | |||
const playlist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) | |||
const hlsDirPath = VideoPathManager.Instance.getFSHLSOutputPath(video) | |||
const masterPlaylistPath = join(hlsDirPath, playlist.playlistFilename) | |||
let masterPlaylistContent = await readFile(masterPlaylistPath, 'utf8') | |||
for (const videoFile of hls.VideoFiles) { | |||
const srcName = `${videoFile.resolution}.m3u8` | |||
const dstName = getHlsResolutionPlaylistFilename(videoFile.filename) | |||
const src = join(hlsDirPath, srcName) | |||
const dst = join(hlsDirPath, dstName) | |||
try { | |||
await move(src, dst) | |||
masterPlaylistContent = masterPlaylistContent.replace(new RegExp('^' + srcName + '$', 'm'), dstName) | |||
} catch (err) { | |||
console.error('Cannot move video file %s to %s.', src, dst, err) | |||
} | |||
} | |||
await writeFile(masterPlaylistPath, masterPlaylistContent) | |||
if (playlist.segmentsSha256Filename === 'segments-sha256.json') { | |||
try { | |||
const newName = generateHlsSha256SegmentsFilename(video.isLive) | |||
const dst = join(hlsDirPath, newName) | |||
await move(join(hlsDirPath, playlist.segmentsSha256Filename), dst) | |||
playlist.segmentsSha256Filename = newName | |||
} catch (err) { | |||
console.error(`Cannot rename ${video.name} segments-sha256.json file to a new name`, err) | |||
} | |||
} | |||
if (playlist.playlistFilename === 'master.m3u8') { | |||
try { | |||
const newName = generateHLSMasterPlaylistFilename(video.isLive) | |||
const dst = join(hlsDirPath, newName) | |||
await move(join(hlsDirPath, playlist.playlistFilename), dst) | |||
playlist.playlistFilename = newName | |||
} catch (err) { | |||
console.error(`Cannot rename ${video.name} master.m3u8 file to a new name`, err) | |||
} | |||
} | |||
// Everything worked, we can save the playlist now | |||
await playlist.save() | |||
const allVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.id) | |||
await federateVideoIfNeeded(allVideo, false) | |||
console.log(`Successfully moved HLS files of ${video.name}.`) | |||
} |
@@ -7,7 +7,6 @@ import { pathExists, remove } from 'fs-extra' | |||
import { generateImageFilename, processImage } from '@server/helpers/image-utils' | |||
import { THUMBNAILS_SIZE } from '@server/initializers/constants' | |||
import { VideoModel } from '@server/models/video/video' | |||
import { MVideo } from '@server/types/models' | |||
import { initDatabaseModels } from '@server/initializers/database' | |||
program | |||
@@ -21,16 +20,16 @@ run() | |||
async function run () { | |||
await initDatabaseModels(true) | |||
const videos = await VideoModel.listLocal() | |||
const ids = await VideoModel.listLocalIds() | |||
await map(videos, v => { | |||
return processVideo(v) | |||
.catch(err => console.error('Cannot process video %s.', v.url, err)) | |||
await map(ids, id => { | |||
return processVideo(id) | |||
.catch(err => console.error('Cannot process video %d.', id, err)) | |||
}, { concurrency: 20 }) | |||
} | |||
async function processVideo (videoArg: MVideo) { | |||
const video = await VideoModel.loadWithFiles(videoArg.id) | |||
async function processVideo (id: number) { | |||
const video = await VideoModel.loadWithFiles(id) | |||
console.log('Processing video %s.', video.name) | |||
@@ -115,9 +115,9 @@ async function run () { | |||
console.log('Updating video and torrent files.') | |||
const localVideos = await VideoModel.listLocal() | |||
for (const localVideo of localVideos) { | |||
const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(localVideo.id) | |||
const ids = await VideoModel.listLocalIds() | |||
for (const id of ids) { | |||
const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(id) | |||
console.log('Updating video ' + video.uuid) | |||
@@ -1,7 +1,7 @@ | |||
import { close, ensureDir, move, open, outputJSON, read, readFile, remove, stat, writeFile } from 'fs-extra' | |||
import { flatten, uniq } from 'lodash' | |||
import { basename, dirname, join } from 'path' | |||
import { MStreamingPlaylistFilesVideo, MVideoWithFile } from '@server/types/models' | |||
import { MStreamingPlaylistFilesVideo, MVideo, MVideoUUID } from '@server/types/models' | |||
import { sha256 } from '../helpers/core-utils' | |||
import { getAudioStreamCodec, getVideoStreamCodec, getVideoStreamSize } from '../helpers/ffprobe-utils' | |||
import { logger } from '../helpers/logger' | |||
@@ -31,7 +31,7 @@ async function updateStreamingPlaylistsInfohashesIfNeeded () { | |||
} | |||
} | |||
async function updateMasterHLSPlaylist (video: MVideoWithFile, playlist: MStreamingPlaylistFilesVideo) { | |||
async function updateMasterHLSPlaylist (video: MVideo, playlist: MStreamingPlaylistFilesVideo) { | |||
const masterPlaylists: string[] = [ '#EXTM3U', '#EXT-X-VERSION:3' ] | |||
for (const file of playlist.VideoFiles) { | |||
@@ -63,7 +63,7 @@ async function updateMasterHLSPlaylist (video: MVideoWithFile, playlist: MStream | |||
}) | |||
} | |||
async function updateSha256VODSegments (video: MVideoWithFile, playlist: MStreamingPlaylistFilesVideo) { | |||
async function updateSha256VODSegments (video: MVideoUUID, playlist: MStreamingPlaylistFilesVideo) { | |||
const json: { [filename: string]: { [range: string]: string } } = {} | |||
// For all the resolutions available for this video | |||
@@ -108,7 +108,7 @@ class JobQueue { | |||
private constructor () { | |||
} | |||
init () { | |||
init (produceOnly = false) { | |||
// Already initialized | |||
if (this.initialized === true) return | |||
this.initialized = true | |||
@@ -124,6 +124,12 @@ class JobQueue { | |||
for (const handlerName of (Object.keys(handlers) as JobType[])) { | |||
const queue = new Bull(handlerName, queueOptions) | |||
if (produceOnly) { | |||
queue.pause(true) | |||
.catch(err => logger.error('Cannot pause queue %s in produced only job queue', handlerName, { err })) | |||
} | |||
const handler = handlers[handlerName] | |||
queue.process(this.getJobConcurrency(handlerName), handler) | |||
@@ -57,10 +57,33 @@ function moveToNextState (video: MVideoUUID, isNewVideo = true) { | |||
}) | |||
} | |||
async function moveToExternalStorageState (video: MVideoFullLight, isNewVideo: boolean, transaction: Transaction) { | |||
const videoJobInfo = await VideoJobInfoModel.load(video.id, transaction) | |||
const pendingTranscode = videoJobInfo?.pendingTranscode || 0 | |||
// We want to wait all transcoding jobs before moving the video on an external storage | |||
if (pendingTranscode !== 0) return false | |||
await video.setNewState(VideoState.TO_MOVE_TO_EXTERNAL_STORAGE, isNewVideo, transaction) | |||
logger.info('Creating external storage move job for video %s.', video.uuid, { tags: [ video.uuid ] }) | |||
try { | |||
await addMoveToObjectStorageJob(video, isNewVideo) | |||
return true | |||
} catch (err) { | |||
logger.error('Cannot add move to object storage job', { err }) | |||
return false | |||
} | |||
} | |||
// --------------------------------------------------------------------------- | |||
export { | |||
buildNextVideoState, | |||
moveToExternalStorageState, | |||
moveToNextState | |||
} | |||
@@ -82,18 +105,3 @@ async function moveToPublishedState (video: MVideoFullLight, isNewVideo: boolean | |||
Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(video) | |||
} | |||
} | |||
async function moveToExternalStorageState (video: MVideoFullLight, isNewVideo: boolean, transaction: Transaction) { | |||
const videoJobInfo = await VideoJobInfoModel.load(video.id, transaction) | |||
const pendingTranscode = videoJobInfo?.pendingTranscode || 0 | |||
// We want to wait all transcoding jobs before moving the video on an external storage | |||
if (pendingTranscode !== 0) return | |||
await video.setNewState(VideoState.TO_MOVE_TO_EXTERNAL_STORAGE, isNewVideo, transaction) | |||
logger.info('Creating external storage move job for video %s.', video.uuid, { tags: [ video.uuid ] }) | |||
addMoveToObjectStorageJob(video, isNewVideo) | |||
.catch(err => logger.error('Cannot add move to object storage job', { err })) | |||
} |
@@ -805,14 +805,17 @@ export class VideoModel extends Model<Partial<AttributesOnly<VideoModel>>> { | |||
await Promise.all(tasks) | |||
} | |||
static listLocal (): Promise<MVideo[]> { | |||
static listLocalIds (): Promise<number[]> { | |||
const query = { | |||
attributes: [ 'id' ], | |||
raw: true, | |||
where: { | |||
remote: false | |||
} | |||
} | |||
return VideoModel.findAll(query) | |||
.then(rows => rows.map(r => r.id)) | |||
} | |||
static listAllAndSharedByActorForOutbox (actorId: number, start: number, count: number) { | |||
@@ -1674,6 +1677,8 @@ export class VideoModel extends Model<Partial<AttributesOnly<VideoModel>>> { | |||
if (!this.VideoStreamingPlaylists) return undefined | |||
const playlist = this.VideoStreamingPlaylists.find(p => p.type === VideoStreamingPlaylistType.HLS) | |||
if (!playlist) return undefined | |||
playlist.Video = this | |||
return playlist | |||
@@ -1785,7 +1790,7 @@ export class VideoModel extends Model<Partial<AttributesOnly<VideoModel>>> { | |||
await this.save({ transaction }) | |||
} | |||
getBandwidthBits (videoFile: MVideoFile) { | |||
getBandwidthBits (this: MVideo, videoFile: MVideoFile) { | |||
return Math.ceil((videoFile.size * 8) / this.duration) | |||
} | |||
@@ -0,0 +1,114 @@ | |||
/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ | |||
import 'mocha' | |||
import { | |||
areObjectStorageTestsDisabled, | |||
cleanupTests, | |||
createMultipleServers, | |||
doubleFollow, | |||
expectStartWith, | |||
makeRawRequest, | |||
ObjectStorageCommand, | |||
PeerTubeServer, | |||
setAccessTokensToServers, | |||
waitJobs | |||
} from '@shared/extra-utils' | |||
import { HttpStatusCode, VideoDetails } from '@shared/models' | |||
async function checkFiles (origin: PeerTubeServer, video: VideoDetails, inObjectStorage: boolean) { | |||
for (const file of video.files) { | |||
const start = inObjectStorage | |||
? ObjectStorageCommand.getWebTorrentBaseUrl() | |||
: origin.url | |||
expectStartWith(file.fileUrl, start) | |||
await makeRawRequest(file.fileUrl, HttpStatusCode.OK_200) | |||
} | |||
const start = inObjectStorage | |||
? ObjectStorageCommand.getPlaylistBaseUrl() | |||
: origin.url | |||
const hls = video.streamingPlaylists[0] | |||
expectStartWith(hls.playlistUrl, start) | |||
expectStartWith(hls.segmentsSha256Url, start) | |||
for (const file of hls.files) { | |||
expectStartWith(file.fileUrl, start) | |||
await makeRawRequest(file.fileUrl, HttpStatusCode.OK_200) | |||
} | |||
} | |||
describe('Test create move video storage job', function () { | |||
if (areObjectStorageTestsDisabled()) return | |||
let servers: PeerTubeServer[] = [] | |||
const uuids: string[] = [] | |||
before(async function () { | |||
this.timeout(360000) | |||
// Run server 2 to have transcoding enabled | |||
servers = await createMultipleServers(2) | |||
await setAccessTokensToServers(servers) | |||
await doubleFollow(servers[0], servers[1]) | |||
await ObjectStorageCommand.prepareDefaultBuckets() | |||
await servers[0].config.enableTranscoding() | |||
for (let i = 0; i < 3; i++) { | |||
const { uuid } = await servers[0].videos.upload({ attributes: { name: 'video' + i } }) | |||
uuids.push(uuid) | |||
} | |||
await waitJobs(servers) | |||
await servers[0].kill() | |||
await servers[0].run(ObjectStorageCommand.getDefaultConfig()) | |||
}) | |||
it('Should move only one file', async function () { | |||
this.timeout(120000) | |||
const command = `npm run create-move-video-storage-job -- --to-object-storage -v ${uuids[1]}` | |||
await servers[0].cli.execWithEnv(command, ObjectStorageCommand.getDefaultConfig()) | |||
await waitJobs(servers) | |||
for (const server of servers) { | |||
const video = await server.videos.get({ id: uuids[1] }) | |||
await checkFiles(servers[0], video, true) | |||
for (const id of [ uuids[0], uuids[2] ]) { | |||
const video = await server.videos.get({ id }) | |||
await checkFiles(servers[0], video, false) | |||
} | |||
} | |||
}) | |||
it('Should move all files', async function () { | |||
this.timeout(120000) | |||
const command = `npm run create-move-video-storage-job -- --to-object-storage --all-videos` | |||
await servers[0].cli.execWithEnv(command, ObjectStorageCommand.getDefaultConfig()) | |||
await waitJobs(servers) | |||
for (const server of servers) { | |||
for (const id of [ uuids[0], uuids[2] ]) { | |||
const video = await server.videos.get({ id }) | |||
await checkFiles(servers[0], video, true) | |||
} | |||
} | |||
}) | |||
after(async function () { | |||
await cleanupTests(servers) | |||
}) | |||
}) |
@@ -1,6 +1,7 @@ | |||
// Order of the tests we want to execute | |||
import './create-import-video-file-job' | |||
import './create-transcoding-job' | |||
import './create-move-video-storage-job' | |||
import './peertube' | |||
import './plugins' | |||
import './print-transcode-command' | |||
@@ -17,7 +17,11 @@ export class CLICommand extends AbstractCommand { | |||
return `NODE_ENV=test NODE_APP_INSTANCE=${this.server.internalServerNumber}` | |||
} | |||
async execWithEnv (command: string) { | |||
return CLICommand.exec(`${this.getEnv()} ${command}`) | |||
async execWithEnv (command: string, configOverride?: any) { | |||
const prefix = configOverride | |||
? `NODE_CONFIG='${JSON.stringify(configOverride)}'` | |||
: '' | |||
return CLICommand.exec(`${prefix} ${this.getEnv()} ${command}`) | |||
} | |||
} |
@@ -17,6 +17,7 @@ | |||
- [regenerate-thumbnails.js](#regenerate-thumbnailsjs) | |||
- [create-transcoding-job.js](#create-transcoding-jobjs) | |||
- [create-import-video-file-job.js](#create-import-video-file-jobjs) | |||
- [create-move-video-storage-job.js](#create-move-video-storage-jobjs) | |||
- [prune-storage.js](#prune-storagejs) | |||
- [update-host.js](#update-hostjs) | |||
- [reset-password.js](#reset-passwordjs) | |||
@@ -303,6 +304,33 @@ $ cd /var/www/peertube-docker | |||
$ docker-compose exec -u peertube peertube npm run create-import-video-file-job -- -v [videoUUID] -i [videoFile] | |||
``` | |||
### create-move-video-storage-job.js | |||
Use this script to move all video files or a specific video file to object storage. | |||
```bash | |||
$ # Basic installation | |||
$ cd /var/www/peertube/peertube-latest | |||
$ sudo -u peertube NODE_CONFIG_DIR=/var/www/peertube/config NODE_ENV=production npm run create-move-video-storage-job -- --to-object-storage -v [videoUUID] | |||
$ # Docker installation | |||
$ cd /var/www/peertube-docker | |||
$ docker-compose exec -u peertube peertube npm run create-move-video-storage-job -- --to-object-storage -v [videoUUID] | |||
``` | |||
The script can also move all video files that are not already in object storage: | |||
```bash | |||
$ # Basic installation | |||
$ cd /var/www/peertube/peertube-latest | |||
$ sudo -u peertube NODE_CONFIG_DIR=/var/www/peertube/config NODE_ENV=production npm run create-move-video-storage-job -- --to-object-storage --all-videos | |||
$ # Docker installation | |||
$ cd /var/www/peertube-docker | |||
$ docker-compose exec -u peertube peertube npm run create-move-video-storage-job -- --to-object-storage --all-videos | |||
``` | |||
### prune-storage.js | |||
Some transcoded videos or shutdown at a bad time can leave some unused files on your storage. | |||