* Add support for saving video files to object storage * Add support for custom url generation on s3 stored files Uses two config keys to support url generation that doesn't directly go to (compatible s3). Can be used to generate urls to any cache server or CDN. * Upload files to s3 concurrently and delete originals afterwards * Only publish after move to object storage is complete * Use base url instead of url template * Fix mistyped config field * Add rudenmentary way to download before transcode * Implement Chocobozzz suggestions https://github.com/Chocobozzz/PeerTube/pull/4290#issuecomment-891670478 The remarks in question: Try to use objectStorage prefix instead of s3 prefix for your function/variables/config names Prefer to use a tree for the config: s3.streaming_playlists_bucket -> object_storage.streaming_playlists.bucket Use uppercase for config: S3.STREAMING_PLAYLISTS_BUCKETINFO.bucket -> OBJECT_STORAGE.STREAMING_PLAYLISTS.BUCKET (maybe BUCKET_NAME instead of BUCKET) I suggest to rename moveJobsRunning to pendingMovingJobs (or better, create a dedicated videoJobInfo table with a pendingMove & videoId columns so we could also use this table to track pending transcoding jobs) https://github.com/Chocobozzz/PeerTube/pull/4290/files#diff-3e26d41ca4bda1de8e1747af70ca2af642abcc1e9e0bfb94239ff2165acfbde5R19 uses a string instead of an integer I think we should store the origin object storage URL in fileUrl, without base_url injection. Instead, inject the base_url at "runtime" so admins can easily change this configuration without running a script to update DB URLs * Import correct function * Support multipart upload * Remove import of node 15.0 module stream/promises * Extend maximum upload job length Using the same value as for redundancy downloading seems logical * Use dynamic part size for really large uploads Also adds very small part size for local testing * Fix decreasePendingMove query * Resolve various PR comments * Move to object storage after optimize * Make upload size configurable and increase default * Prune webtorrent files that are stored in object storage * Move files after transcoding jobs * Fix federation * Add video path manager * Support move to external storage job in client * Fix live object storage tests Co-authored-by: Chocobozzz <me@florianbigard.com>tags/v3.4.0-rc.1
@@ -31,6 +31,11 @@ jobs: | |||
ports: | |||
- 10389:10389 | |||
s3ninja: | |||
image: scireum/s3-ninja | |||
ports: | |||
- 9444:9000 | |||
strategy: | |||
fail-fast: false | |||
matrix: | |||
@@ -40,6 +45,7 @@ jobs: | |||
PGUSER: peertube | |||
PGHOST: localhost | |||
NODE_PENDING_JOB_WAIT: 250 | |||
ENABLE_OBJECT_STORAGE_TESTS: true | |||
steps: | |||
- uses: actions/checkout@v2 | |||
@@ -36,7 +36,8 @@ export class JobsComponent extends RestTable implements OnInit { | |||
'video-live-ending', | |||
'video-redundancy', | |||
'video-transcoding', | |||
'videos-views' | |||
'videos-views', | |||
'move-to-object-storage' | |||
] | |||
jobs: Job[] = [] | |||
@@ -6,6 +6,10 @@ | |||
The video is being transcoded, it may not work properly. | |||
</div> | |||
<div i18n class="alert alert-warning" *ngIf="isVideoToMoveToExternalStorage()"> | |||
The video is being moved to an external server, it may not work properly. | |||
</div> | |||
<div i18n class="alert alert-info" *ngIf="hasVideoScheduledPublication()"> | |||
This video will be published on {{ video.scheduledUpdate.updateAt | date: 'full' }}. | |||
</div> | |||
@@ -18,6 +18,10 @@ export class VideoAlertComponent { | |||
return this.video && this.video.state.id === VideoState.TO_IMPORT | |||
} | |||
isVideoToMoveToExternalStorage () { | |||
return this.video && this.video.state.id === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE | |||
} | |||
hasVideoScheduledPublication () { | |||
return this.video && this.video.scheduledUpdate !== undefined | |||
} | |||
@@ -95,6 +95,39 @@ storage: | |||
# If not, peertube will fallback to the default fil | |||
client_overrides: 'storage/client-overrides/' | |||
object_storage: | |||
enabled: false | |||
# Without protocol, will default to HTTPS | |||
endpoint: '' # 's3.amazonaws.com' or 's3.fr-par.scw.cloud' for example | |||
region: 'us-east-1' | |||
credentials: | |||
# You can also use AWS_ACCESS_KEY_ID env variable | |||
access_key_id: '' | |||
# You can also use AWS_SECRET_ACCESS_KEY env variable | |||
secret_access_key: '' | |||
# Maximum amount to upload in one request to object storage | |||
max_upload_part: 2GB | |||
streaming_playlists: | |||
bucket_name: 'streaming-playlists' | |||
# Allows setting all buckets to the same value but with a different prefix | |||
prefix: '' # Example: 'streaming-playlists:' | |||
# Base url for object URL generation, scheme and host will be replaced by this URL | |||
# Useful when you want to use a CDN/external proxy | |||
base_url: '' # Example: 'https://mirror.example.com' | |||
# Same settings but for webtorrent videos | |||
videos: | |||
bucket_name: 'videos' | |||
prefix: '' | |||
base_url: '' | |||
log: | |||
level: 'info' # 'debug' | 'info' | 'warn' | 'error' | |||
rotation: | |||
@@ -93,6 +93,39 @@ storage: | |||
# If not, peertube will fallback to the default file | |||
client_overrides: '/var/www/peertube/storage/client-overrides/' | |||
object_storage: | |||
enabled: false | |||
# Without protocol, will default to HTTPS | |||
endpoint: '' # 's3.amazonaws.com' or 's3.fr-par.scw.cloud' for example | |||
region: 'us-east-1' | |||
credentials: | |||
# You can also use AWS_ACCESS_KEY_ID env variable | |||
access_key_id: '' | |||
# You can also use AWS_SECRET_ACCESS_KEY env variable | |||
secret_access_key: '' | |||
# Maximum amount to upload in one request to object storage | |||
max_upload_part: 2GB | |||
streaming_playlists: | |||
bucket_name: 'streaming-playlists' | |||
# Allows setting all buckets to the same value but with a different prefix | |||
prefix: '' # Example: 'streaming-playlists:' | |||
# Base url for object URL generation, scheme and host will be replaced by this URL | |||
# Useful when you want to use a CDN/external proxy | |||
base_url: '' # Example: 'https://mirror.example.com' | |||
# Same settings but for webtorrent videos | |||
videos: | |||
bucket_name: 'videos' | |||
prefix: '' | |||
base_url: '' | |||
log: | |||
level: 'info' # 'debug' | 'info' | 'warn' | 'error' | |||
rotation: | |||
@@ -73,6 +73,7 @@ | |||
"swagger-cli": "swagger-cli" | |||
}, | |||
"dependencies": { | |||
"@aws-sdk/client-s3": "^3.23.0", | |||
"@uploadx/core": "^4.4.0", | |||
"async": "^3.0.1", | |||
"async-lru": "^1.1.1", | |||
@@ -89,9 +89,10 @@ elif [ "$1" = "api-4" ]; then | |||
moderationFiles=$(findTestFiles ./dist/server/tests/api/moderation) | |||
redundancyFiles=$(findTestFiles ./dist/server/tests/api/redundancy) | |||
objectStorageFiles=$(findTestFiles ./dist/server/tests/api/object-storage) | |||
activitypubFiles=$(findTestFiles ./dist/server/tests/api/activitypub) | |||
MOCHA_PARALLEL=true TS_NODE_FILES=true runTest "$1" 2 $moderationFiles $redundancyFiles $activitypubFiles | |||
MOCHA_PARALLEL=true TS_NODE_FILES=true runTest "$1" 2 $moderationFiles $redundancyFiles $activitypubFiles $objectStorageFiles | |||
elif [ "$1" = "external-plugins" ]; then | |||
npm run build:server | |||
@@ -6,9 +6,10 @@ import { VideoModel } from '../server/models/video/video' | |||
import { initDatabaseModels } from '../server/initializers/database' | |||
import { JobQueue } from '../server/lib/job-queue' | |||
import { computeResolutionsToTranscode } from '@server/helpers/ffprobe-utils' | |||
import { VideoTranscodingPayload } from '@shared/models' | |||
import { VideoState, VideoTranscodingPayload } from '@shared/models' | |||
import { CONFIG } from '@server/initializers/config' | |||
import { isUUIDValid } from '@server/helpers/custom-validators/misc' | |||
import { addTranscodingJob } from '@server/lib/video' | |||
program | |||
.option('-v, --video [videoUUID]', 'Video UUID') | |||
@@ -47,7 +48,7 @@ async function run () { | |||
if (!video) throw new Error('Video not found.') | |||
const dataInput: VideoTranscodingPayload[] = [] | |||
const { resolution } = await video.getMaxQualityResolution() | |||
const resolution = video.getMaxQualityFile().resolution | |||
// Generate HLS files | |||
if (options.generateHls || CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) { | |||
@@ -62,6 +63,7 @@ async function run () { | |||
resolution, | |||
isPortraitMode: false, | |||
copyCodecs: false, | |||
isNewVideo: false, | |||
isMaxQuality: false | |||
}) | |||
} | |||
@@ -87,10 +89,13 @@ async function run () { | |||
} | |||
} | |||
await JobQueue.Instance.init() | |||
JobQueue.Instance.init() | |||
video.state = VideoState.TO_TRANSCODE | |||
await video.save() | |||
for (const d of dataInput) { | |||
await JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload: d }) | |||
await addTranscodingJob(d, {}) | |||
console.log('Transcoding job for video %s created.', video.uuid) | |||
} | |||
} |
@@ -1,15 +1,18 @@ | |||
import { registerTSPaths } from '../server/helpers/register-ts-paths' | |||
registerTSPaths() | |||
import { getDurationFromVideoFile, getVideoFileBitrate, getVideoFileFPS, getVideoFileResolution } from '../server/helpers/ffprobe-utils' | |||
import { VideoModel } from '../server/models/video/video' | |||
import { optimizeOriginalVideofile } from '../server/lib/transcoding/video-transcoding' | |||
import { initDatabaseModels } from '../server/initializers/database' | |||
import { basename, dirname } from 'path' | |||
import { copy, move, remove } from 'fs-extra' | |||
import { basename, dirname } from 'path' | |||
import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' | |||
import { getVideoFilePath } from '@server/lib/video-paths' | |||
import { CONFIG } from '@server/initializers/config' | |||
import { processMoveToObjectStorage } from '@server/lib/job-queue/handlers/move-to-object-storage' | |||
import { VideoPathManager } from '@server/lib/video-path-manager' | |||
import { getMaxBitrate } from '@shared/core-utils' | |||
import { MoveObjectStoragePayload } from '@shared/models' | |||
import { getDurationFromVideoFile, getVideoFileBitrate, getVideoFileFPS, getVideoFileResolution } from '../server/helpers/ffprobe-utils' | |||
import { initDatabaseModels } from '../server/initializers/database' | |||
import { optimizeOriginalVideofile } from '../server/lib/transcoding/video-transcoding' | |||
import { VideoModel } from '../server/models/video/video' | |||
run() | |||
.then(() => process.exit(0)) | |||
@@ -39,43 +42,49 @@ async function run () { | |||
currentVideoId = video.id | |||
for (const file of video.VideoFiles) { | |||
currentFilePath = getVideoFilePath(video, file) | |||
const [ videoBitrate, fps, dataResolution ] = await Promise.all([ | |||
getVideoFileBitrate(currentFilePath), | |||
getVideoFileFPS(currentFilePath), | |||
getVideoFileResolution(currentFilePath) | |||
]) | |||
const maxBitrate = getMaxBitrate({ ...dataResolution, fps }) | |||
const isMaxBitrateExceeded = videoBitrate > maxBitrate | |||
if (isMaxBitrateExceeded) { | |||
console.log( | |||
'Optimizing video file %s with bitrate %s kbps (max: %s kbps)', | |||
basename(currentFilePath), videoBitrate / 1000, maxBitrate / 1000 | |||
) | |||
const backupFile = `${currentFilePath}_backup` | |||
await copy(currentFilePath, backupFile) | |||
await optimizeOriginalVideofile(video, file) | |||
// Update file path, the video filename changed | |||
currentFilePath = getVideoFilePath(video, file) | |||
const originalDuration = await getDurationFromVideoFile(backupFile) | |||
const newDuration = await getDurationFromVideoFile(currentFilePath) | |||
if (originalDuration === newDuration) { | |||
console.log('Finished optimizing %s', basename(currentFilePath)) | |||
await remove(backupFile) | |||
continue | |||
await VideoPathManager.Instance.makeAvailableVideoFile(video, file, async path => { | |||
currentFilePath = path | |||
const [ videoBitrate, fps, dataResolution ] = await Promise.all([ | |||
getVideoFileBitrate(currentFilePath), | |||
getVideoFileFPS(currentFilePath), | |||
getVideoFileResolution(currentFilePath) | |||
]) | |||
const maxBitrate = getMaxBitrate({ ...dataResolution, fps }) | |||
const isMaxBitrateExceeded = videoBitrate > maxBitrate | |||
if (isMaxBitrateExceeded) { | |||
console.log( | |||
'Optimizing video file %s with bitrate %s kbps (max: %s kbps)', | |||
basename(currentFilePath), videoBitrate / 1000, maxBitrate / 1000 | |||
) | |||
const backupFile = `${currentFilePath}_backup` | |||
await copy(currentFilePath, backupFile) | |||
await optimizeOriginalVideofile(video, file) | |||
// Update file path, the video filename changed | |||
currentFilePath = VideoPathManager.Instance.getFSVideoFileOutputPath(video, file) | |||
const originalDuration = await getDurationFromVideoFile(backupFile) | |||
const newDuration = await getDurationFromVideoFile(currentFilePath) | |||
if (originalDuration === newDuration) { | |||
console.log('Finished optimizing %s', basename(currentFilePath)) | |||
await remove(backupFile) | |||
return | |||
} | |||
console.log('Failed to optimize %s, restoring original', basename(currentFilePath)) | |||
await move(backupFile, currentFilePath, { overwrite: true }) | |||
await createTorrentAndSetInfoHash(video, file) | |||
await file.save() | |||
} | |||
}) | |||
} | |||
console.log('Failed to optimize %s, restoring original', basename(currentFilePath)) | |||
await move(backupFile, currentFilePath, { overwrite: true }) | |||
await createTorrentAndSetInfoHash(video, file) | |||
await file.save() | |||
} | |||
if (CONFIG.OBJECT_STORAGE.ENABLED === true) { | |||
await processMoveToObjectStorage({ data: { videoUUID: video.uuid } as MoveObjectStoragePayload } as any) | |||
} | |||
} | |||
@@ -1,12 +1,21 @@ | |||
import * as express from 'express' | |||
import { move } from 'fs-extra' | |||
import { basename } from 'path' | |||
import { getLowercaseExtension } from '@server/helpers/core-utils' | |||
import { deleteResumableUploadMetaFile, getResumableUploadPath } from '@server/helpers/upload' | |||
import { uuidToShort } from '@server/helpers/uuid' | |||
import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' | |||
import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' | |||
import { addOptimizeOrMergeAudioJob, buildLocalVideoFromReq, buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video' | |||
import { generateWebTorrentVideoFilename, getVideoFilePath } from '@server/lib/video-paths' | |||
import { generateWebTorrentVideoFilename } from '@server/lib/paths' | |||
import { | |||
addMoveToObjectStorageJob, | |||
addOptimizeOrMergeAudioJob, | |||
buildLocalVideoFromReq, | |||
buildVideoThumbnailsFromReq, | |||
setVideoTags | |||
} from '@server/lib/video' | |||
import { VideoPathManager } from '@server/lib/video-path-manager' | |||
import { buildNextVideoState } from '@server/lib/video-state' | |||
import { openapiOperationDoc } from '@server/middlewares/doc' | |||
import { MVideo, MVideoFile, MVideoFullLight } from '@server/types/models' | |||
import { uploadx } from '@uploadx/core' | |||
@@ -139,23 +148,20 @@ async function addVideo (options: { | |||
const videoData = buildLocalVideoFromReq(videoInfo, videoChannel.id) | |||
videoData.state = CONFIG.TRANSCODING.ENABLED | |||
? VideoState.TO_TRANSCODE | |||
: VideoState.PUBLISHED | |||
videoData.state = buildNextVideoState() | |||
videoData.duration = videoPhysicalFile.duration // duration was added by a previous middleware | |||
const video = new VideoModel(videoData) as MVideoFullLight | |||
video.VideoChannel = videoChannel | |||
video.url = getLocalVideoActivityPubUrl(video) // We use the UUID, so set the URL after building the object | |||
const videoFile = await buildNewFile(video, videoPhysicalFile) | |||
const videoFile = await buildNewFile(videoPhysicalFile) | |||
// Move physical file | |||
const destination = getVideoFilePath(video, videoFile) | |||
const destination = VideoPathManager.Instance.getFSVideoFileOutputPath(video, videoFile) | |||
await move(videoPhysicalFile.path, destination) | |||
// This is important in case if there is another attempt in the retry process | |||
videoPhysicalFile.filename = getVideoFilePath(video, videoFile) | |||
videoPhysicalFile.filename = basename(destination) | |||
videoPhysicalFile.path = destination | |||
const [ thumbnailModel, previewModel ] = await buildVideoThumbnailsFromReq({ | |||
@@ -210,9 +216,13 @@ async function addVideo (options: { | |||
createTorrentFederate(video, videoFile) | |||
.then(() => { | |||
if (video.state !== VideoState.TO_TRANSCODE) return | |||
if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { | |||
return addMoveToObjectStorageJob(video) | |||
} | |||
return addOptimizeOrMergeAudioJob(videoCreated, videoFile, user) | |||
if (video.state === VideoState.TO_TRANSCODE) { | |||
return addOptimizeOrMergeAudioJob(videoCreated, videoFile, user) | |||
} | |||
}) | |||
.catch(err => logger.error('Cannot add optimize/merge audio job for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) })) | |||
@@ -227,7 +237,7 @@ async function addVideo (options: { | |||
}) | |||
} | |||
async function buildNewFile (video: MVideo, videoPhysicalFile: express.VideoUploadFile) { | |||
async function buildNewFile (videoPhysicalFile: express.VideoUploadFile) { | |||
const videoFile = new VideoFileModel({ | |||
extname: getLowercaseExtension(videoPhysicalFile.filename), | |||
size: videoPhysicalFile.size, | |||
@@ -3,9 +3,9 @@ import * as express from 'express' | |||
import { logger } from '@server/helpers/logger' | |||
import { VideosTorrentCache } from '@server/lib/files-cache/videos-torrent-cache' | |||
import { Hooks } from '@server/lib/plugins/hooks' | |||
import { getVideoFilePath } from '@server/lib/video-paths' | |||
import { VideoPathManager } from '@server/lib/video-path-manager' | |||
import { MStreamingPlaylist, MVideo, MVideoFile, MVideoFullLight } from '@server/types/models' | |||
import { HttpStatusCode, VideoStreamingPlaylistType } from '@shared/models' | |||
import { HttpStatusCode, VideoStorage, VideoStreamingPlaylistType } from '@shared/models' | |||
import { STATIC_DOWNLOAD_PATHS } from '../initializers/constants' | |||
import { asyncMiddleware, videosDownloadValidator } from '../middlewares' | |||
@@ -81,7 +81,15 @@ async function downloadVideoFile (req: express.Request, res: express.Response) { | |||
if (!checkAllowResult(res, allowParameters, allowedResult)) return | |||
return res.download(getVideoFilePath(video, videoFile), `${video.name}-${videoFile.resolution}p${videoFile.extname}`) | |||
if (videoFile.storage === VideoStorage.OBJECT_STORAGE) { | |||
return res.redirect(videoFile.getObjectStorageUrl()) | |||
} | |||
await VideoPathManager.Instance.makeAvailableVideoFile(video, videoFile, path => { | |||
const filename = `${video.name}-${videoFile.resolution}p${videoFile.extname}` | |||
return res.download(path, filename) | |||
}) | |||
} | |||
async function downloadHLSVideoFile (req: express.Request, res: express.Response) { | |||
@@ -107,8 +115,15 @@ async function downloadHLSVideoFile (req: express.Request, res: express.Response | |||
if (!checkAllowResult(res, allowParameters, allowedResult)) return | |||
const filename = `${video.name}-${videoFile.resolution}p-${streamingPlaylist.getStringType()}${videoFile.extname}` | |||
return res.download(getVideoFilePath(streamingPlaylist, videoFile), filename) | |||
if (videoFile.storage === VideoStorage.OBJECT_STORAGE) { | |||
return res.redirect(videoFile.getObjectStorageUrl()) | |||
} | |||
await VideoPathManager.Instance.makeAvailableVideoFile(streamingPlaylist, videoFile, path => { | |||
const filename = `${video.name}-${videoFile.resolution}p-${streamingPlaylist.getStringType()}${videoFile.extname}` | |||
return res.download(path, filename) | |||
}) | |||
} | |||
function getVideoFile (req: express.Request, files: MVideoFile[]) { | |||
@@ -6,7 +6,8 @@ import { dirname, join } from 'path' | |||
import * as WebTorrent from 'webtorrent' | |||
import { isArray } from '@server/helpers/custom-validators/misc' | |||
import { WEBSERVER } from '@server/initializers/constants' | |||
import { generateTorrentFileName, getVideoFilePath } from '@server/lib/video-paths' | |||
import { generateTorrentFileName } from '@server/lib/paths' | |||
import { VideoPathManager } from '@server/lib/video-path-manager' | |||
import { MVideo } from '@server/types/models/video/video' | |||
import { MVideoFile, MVideoFileRedundanciesOpt } from '@server/types/models/video/video-file' | |||
import { MStreamingPlaylistVideo } from '@server/types/models/video/video-streaming-playlist' | |||
@@ -78,7 +79,7 @@ async function downloadWebTorrentVideo (target: { magnetUri: string, torrentName | |||
}) | |||
} | |||
async function createTorrentAndSetInfoHash ( | |||
function createTorrentAndSetInfoHash ( | |||
videoOrPlaylist: MVideo | MStreamingPlaylistVideo, | |||
videoFile: MVideoFile | |||
) { | |||
@@ -95,22 +96,24 @@ async function createTorrentAndSetInfoHash ( | |||
urlList: [ videoFile.getFileUrl(video) ] | |||
} | |||
const torrent = await createTorrentPromise(getVideoFilePath(videoOrPlaylist, videoFile), options) | |||
return VideoPathManager.Instance.makeAvailableVideoFile(videoOrPlaylist, videoFile, async videoPath => { | |||
const torrent = await createTorrentPromise(videoPath, options) | |||
const torrentFilename = generateTorrentFileName(videoOrPlaylist, videoFile.resolution) | |||
const torrentPath = join(CONFIG.STORAGE.TORRENTS_DIR, torrentFilename) | |||
logger.info('Creating torrent %s.', torrentPath) | |||
const torrentFilename = generateTorrentFileName(videoOrPlaylist, videoFile.resolution) | |||
const torrentPath = join(CONFIG.STORAGE.TORRENTS_DIR, torrentFilename) | |||
logger.info('Creating torrent %s.', torrentPath) | |||
await writeFile(torrentPath, torrent) | |||
await writeFile(torrentPath, torrent) | |||
// Remove old torrent file if it existed | |||
if (videoFile.hasTorrent()) { | |||
await remove(join(CONFIG.STORAGE.TORRENTS_DIR, videoFile.torrentFilename)) | |||
} | |||
// Remove old torrent file if it existed | |||
if (videoFile.hasTorrent()) { | |||
await remove(join(CONFIG.STORAGE.TORRENTS_DIR, videoFile.torrentFilename)) | |||
} | |||
const parsedTorrent = parseTorrent(torrent) | |||
videoFile.infoHash = parsedTorrent.infoHash | |||
videoFile.torrentFilename = torrentFilename | |||
const parsedTorrent = parseTorrent(torrent) | |||
videoFile.infoHash = parsedTorrent.infoHash | |||
videoFile.torrentFilename = torrentFilename | |||
}) | |||
} | |||
function generateMagnetUri ( | |||
@@ -153,6 +153,29 @@ function checkConfig () { | |||
} | |||
} | |||
// Object storage | |||
if (CONFIG.OBJECT_STORAGE.ENABLED === true) { | |||
if (!CONFIG.OBJECT_STORAGE.VIDEOS.BUCKET_NAME) { | |||
return 'videos_bucket should be set when object storage support is enabled.' | |||
} | |||
if (!CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS.BUCKET_NAME) { | |||
return 'streaming_playlists_bucket should be set when object storage support is enabled.' | |||
} | |||
if ( | |||
CONFIG.OBJECT_STORAGE.VIDEOS.BUCKET_NAME === CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS.BUCKET_NAME && | |||
CONFIG.OBJECT_STORAGE.VIDEOS.PREFIX === CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS.PREFIX | |||
) { | |||
if (CONFIG.OBJECT_STORAGE.VIDEOS.PREFIX === '') { | |||
return 'Object storage bucket prefixes should be set when the same bucket is used for both types of video.' | |||
} else { | |||
return 'Object storage bucket prefixes should be set to different values when the same bucket is used for both types of video.' | |||
} | |||
} | |||
} | |||
return null | |||
} | |||
@@ -73,6 +73,26 @@ const CONFIG = { | |||
PLUGINS_DIR: buildPath(config.get<string>('storage.plugins')), | |||
CLIENT_OVERRIDES_DIR: buildPath(config.get<string>('storage.client_overrides')) | |||
}, | |||
OBJECT_STORAGE: { | |||
ENABLED: config.get<boolean>('object_storage.enabled'), | |||
MAX_UPLOAD_PART: bytes.parse(config.get<string>('object_storage.max_upload_part')), | |||
ENDPOINT: config.get<string>('object_storage.endpoint'), | |||
REGION: config.get<string>('object_storage.region'), | |||
CREDENTIALS: { | |||
ACCESS_KEY_ID: config.get<string>('object_storage.credentials.access_key_id'), | |||
SECRET_ACCESS_KEY: config.get<string>('object_storage.credentials.secret_access_key') | |||
}, | |||
VIDEOS: { | |||
BUCKET_NAME: config.get<string>('object_storage.videos.bucket_name'), | |||
PREFIX: config.get<string>('object_storage.videos.prefix'), | |||
BASE_URL: config.get<string>('object_storage.videos.base_url') | |||
}, | |||
STREAMING_PLAYLISTS: { | |||
BUCKET_NAME: config.get<string>('object_storage.streaming_playlists.bucket_name'), | |||
PREFIX: config.get<string>('object_storage.streaming_playlists.prefix'), | |||
BASE_URL: config.get<string>('object_storage.streaming_playlists.base_url') | |||
} | |||
}, | |||
WEBSERVER: { | |||
SCHEME: config.get<boolean>('webserver.https') === true ? 'https' : 'http', | |||
WS: config.get<boolean>('webserver.https') === true ? 'wss' : 'ws', | |||
@@ -24,7 +24,7 @@ import { CONFIG, registerConfigChangedHandler } from './config' | |||
// --------------------------------------------------------------------------- | |||
const LAST_MIGRATION_VERSION = 655 | |||
const LAST_MIGRATION_VERSION = 660 | |||
// --------------------------------------------------------------------------- | |||
@@ -147,7 +147,8 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = { | |||
'videos-views': 1, | |||
'activitypub-refresher': 1, | |||
'video-redundancy': 1, | |||
'video-live-ending': 1 | |||
'video-live-ending': 1, | |||
'move-to-object-storage': 3 | |||
} | |||
// Excluded keys are jobs that can be configured by admins | |||
const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = { | |||
@@ -162,7 +163,8 @@ const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-im | |||
'videos-views': 1, | |||
'activitypub-refresher': 1, | |||
'video-redundancy': 1, | |||
'video-live-ending': 10 | |||
'video-live-ending': 10, | |||
'move-to-object-storage': 1 | |||
} | |||
const JOB_TTL: { [id in JobType]: number } = { | |||
'activitypub-http-broadcast': 60000 * 10, // 10 minutes | |||
@@ -178,7 +180,8 @@ const JOB_TTL: { [id in JobType]: number } = { | |||
'videos-views': undefined, // Unlimited | |||
'activitypub-refresher': 60000 * 10, // 10 minutes | |||
'video-redundancy': 1000 * 3600 * 3, // 3 hours | |||
'video-live-ending': 1000 * 60 * 10 // 10 minutes | |||
'video-live-ending': 1000 * 60 * 10, // 10 minutes | |||
'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours | |||
} | |||
const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = { | |||
'videos-views': { | |||
@@ -412,7 +415,8 @@ const VIDEO_STATES: { [ id in VideoState ]: string } = { | |||
[VideoState.TO_TRANSCODE]: 'To transcode', | |||
[VideoState.TO_IMPORT]: 'To import', | |||
[VideoState.WAITING_FOR_LIVE]: 'Waiting for livestream', | |||
[VideoState.LIVE_ENDED]: 'Livestream ended' | |||
[VideoState.LIVE_ENDED]: 'Livestream ended', | |||
[VideoState.TO_MOVE_TO_EXTERNAL_STORAGE]: 'To move to an external storage' | |||
} | |||
const VIDEO_IMPORT_STATES: { [ id in VideoImportState ]: string } = { | |||
@@ -45,6 +45,7 @@ import { VideoTagModel } from '../models/video/video-tag' | |||
import { VideoViewModel } from '../models/video/video-view' | |||
import { CONFIG } from './config' | |||
import { ActorCustomPageModel } from '@server/models/account/actor-custom-page' | |||
import { VideoJobInfoModel } from '@server/models/video/video-job-info' | |||
require('pg').defaults.parseInt8 = true // Avoid BIGINT to be converted to string | |||
@@ -143,7 +144,8 @@ async function initDatabaseModels (silent: boolean) { | |||
TrackerModel, | |||
VideoTrackerModel, | |||
PluginModel, | |||
ActorCustomPageModel | |||
ActorCustomPageModel, | |||
VideoJobInfoModel | |||
]) | |||
// Check extensions exist in the database | |||
@@ -1,7 +1,4 @@ | |||
import * as Sequelize from 'sequelize' | |||
import { stat } from 'fs-extra' | |||
import { VideoModel } from '../../models/video/video' | |||
import { getVideoFilePath } from '@server/lib/video-paths' | |||
function up (utils: { | |||
transaction: Sequelize.Transaction | |||
@@ -9,30 +6,7 @@ function up (utils: { | |||
sequelize: Sequelize.Sequelize | |||
db: any | |||
}): Promise<void> { | |||
return utils.db.Video.listOwnedAndPopulateAuthorAndTags() | |||
.then((videos: VideoModel[]) => { | |||
const tasks: Promise<any>[] = [] | |||
videos.forEach(video => { | |||
video.VideoFiles.forEach(videoFile => { | |||
const p = new Promise((res, rej) => { | |||
stat(getVideoFilePath(video, videoFile), (err, stats) => { | |||
if (err) return rej(err) | |||
videoFile.size = stats.size | |||
videoFile.save().then(res).catch(rej) | |||
}) | |||
}) | |||
tasks.push(p) | |||
}) | |||
}) | |||
return tasks | |||
}) | |||
.then((tasks: Promise<any>[]) => { | |||
return Promise.all(tasks) | |||
}) | |||
throw new Error('Removed, please upgrade from a previous version first.') | |||
} | |||
function down (options) { | |||
@@ -0,0 +1,58 @@ | |||
import * as Sequelize from 'sequelize' | |||
import { VideoStorage } from '@shared/models' | |||
async function up (utils: { | |||
transaction: Sequelize.Transaction | |||
queryInterface: Sequelize.QueryInterface | |||
sequelize: Sequelize.Sequelize | |||
db: any | |||
}): Promise<void> { | |||
{ | |||
const query = ` | |||
CREATE TABLE IF NOT EXISTS "videoJobInfo" ( | |||
"id" serial, | |||
"pendingMove" INTEGER NOT NULL, | |||
"pendingTranscode" INTEGER NOT NULL, | |||
"videoId" serial UNIQUE NOT NULL REFERENCES "video" ("id") ON DELETE CASCADE ON UPDATE CASCADE, | |||
"createdAt" timestamp WITH time zone NOT NULL, | |||
"updatedAt" timestamp WITH time zone NOT NULL, | |||
PRIMARY KEY ("id") | |||
); | |||
` | |||
await utils.sequelize.query(query) | |||
} | |||
{ | |||
await utils.queryInterface.addColumn('videoFile', 'storage', { type: Sequelize.INTEGER, allowNull: true }) | |||
} | |||
{ | |||
await utils.sequelize.query( | |||
`UPDATE "videoFile" SET "storage" = ${VideoStorage.FILE_SYSTEM}` | |||
) | |||
} | |||
{ | |||
await utils.queryInterface.changeColumn('videoFile', 'storage', { type: Sequelize.INTEGER, allowNull: false }) | |||
} | |||
{ | |||
await utils.queryInterface.addColumn('videoStreamingPlaylist', 'storage', { type: Sequelize.INTEGER, allowNull: true }) | |||
} | |||
{ | |||
await utils.sequelize.query( | |||
`UPDATE "videoStreamingPlaylist" SET "storage" = ${VideoStorage.FILE_SYSTEM}` | |||
) | |||
} | |||
{ | |||
await utils.queryInterface.changeColumn('videoStreamingPlaylist', 'storage', { type: Sequelize.INTEGER, allowNull: false }) | |||
} | |||
} | |||
function down (options) { | |||
throw new Error('Not implemented.') | |||
} | |||
export { | |||
up, | |||
down | |||
} |
@@ -6,7 +6,7 @@ import { isVideoFileInfoHashValid } from '@server/helpers/custom-validators/vide | |||
import { logger } from '@server/helpers/logger' | |||
import { getExtFromMimetype } from '@server/helpers/video' | |||
import { ACTIVITY_PUB, MIMETYPES, P2P_MEDIA_LOADER_PEER_VERSION, PREVIEWS_SIZE, THUMBNAILS_SIZE } from '@server/initializers/constants' | |||
import { generateTorrentFileName } from '@server/lib/video-paths' | |||
import { generateTorrentFileName } from '@server/lib/paths' | |||
import { VideoCaptionModel } from '@server/models/video/video-caption' | |||
import { VideoFileModel } from '@server/models/video/video-file' | |||
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' | |||
@@ -1,4 +1,4 @@ | |||
import { close, ensureDir, move, open, outputJSON, pathExists, read, readFile, remove, stat, writeFile } from 'fs-extra' | |||
import { close, ensureDir, move, open, outputJSON, read, readFile, remove, stat, writeFile } from 'fs-extra' | |||
import { flatten, uniq } from 'lodash' | |||
import { basename, dirname, join } from 'path' | |||
import { MStreamingPlaylistFilesVideo, MVideoWithFile } from '@server/types/models' | |||
@@ -8,11 +8,12 @@ import { logger } from '../helpers/logger' | |||
import { doRequest, doRequestAndSaveToFile } from '../helpers/requests' | |||
import { generateRandomString } from '../helpers/utils' | |||
import { CONFIG } from '../initializers/config' | |||
import { HLS_STREAMING_PLAYLIST_DIRECTORY, P2P_MEDIA_LOADER_PEER_VERSION } from '../initializers/constants' | |||
import { P2P_MEDIA_LOADER_PEER_VERSION } from '../initializers/constants' | |||
import { sequelizeTypescript } from '../initializers/database' | |||
import { VideoFileModel } from '../models/video/video-file' | |||
import { VideoStreamingPlaylistModel } from '../models/video/video-streaming-playlist' | |||
import { getHlsResolutionPlaylistFilename, getVideoFilePath } from './video-paths' | |||
import { getHlsResolutionPlaylistFilename } from './paths' | |||
import { VideoPathManager } from './video-path-manager' | |||
async function updateStreamingPlaylistsInfohashesIfNeeded () { | |||
const playlistsToUpdate = await VideoStreamingPlaylistModel.listByIncorrectPeerVersion() | |||
@@ -31,75 +32,66 @@ async function updateStreamingPlaylistsInfohashesIfNeeded () { | |||
} | |||
async function updateMasterHLSPlaylist (video: MVideoWithFile, playlist: MStreamingPlaylistFilesVideo) { | |||
const directory = join(HLS_STREAMING_PLAYLIST_DIRECTORY, video.uuid) | |||
const masterPlaylists: string[] = [ '#EXTM3U', '#EXT-X-VERSION:3' ] | |||
const masterPlaylistPath = join(directory, playlist.playlistFilename) | |||
for (const file of playlist.VideoFiles) { | |||
const playlistFilename = getHlsResolutionPlaylistFilename(file.filename) | |||
// If we did not generated a playlist for this resolution, skip | |||
const filePlaylistPath = join(directory, playlistFilename) | |||
if (await pathExists(filePlaylistPath) === false) continue | |||
const videoFilePath = getVideoFilePath(playlist, file) | |||
await VideoPathManager.Instance.makeAvailableVideoFile(playlist, file, async videoFilePath => { | |||
const size = await getVideoStreamSize(videoFilePath) | |||
const size = await getVideoStreamSize(videoFilePath) | |||
const bandwidth = 'BANDWIDTH=' + video.getBandwidthBits(file) | |||
const resolution = `RESOLUTION=${size.width}x${size.height}` | |||
const bandwidth = 'BANDWIDTH=' + video.getBandwidthBits(file) | |||
const resolution = `RESOLUTION=${size.width}x${size.height}` | |||
let line = `#EXT-X-STREAM-INF:${bandwidth},${resolution}` | |||
if (file.fps) line += ',FRAME-RATE=' + file.fps | |||
let line = `#EXT-X-STREAM-INF:${bandwidth},${resolution}` | |||
if (file.fps) line += ',FRAME-RATE=' + file.fps | |||
const codecs = await Promise.all([ | |||
getVideoStreamCodec(videoFilePath), | |||
getAudioStreamCodec(videoFilePath) | |||
]) | |||
const codecs = await Promise.all([ | |||
getVideoStreamCodec(videoFilePath), | |||
getAudioStreamCodec(videoFilePath) | |||
]) | |||
line += `,CODECS="${codecs.filter(c => !!c).join(',')}"` | |||
line += `,CODECS="${codecs.filter(c => !!c).join(',')}"` | |||
masterPlaylists.push(line) | |||
masterPlaylists.push(playlistFilename) | |||
masterPlaylists.push(line) | |||
masterPlaylists.push(playlistFilename) | |||
}) | |||
} | |||
await writeFile(masterPlaylistPath, masterPlaylists.join('\n') + '\n') | |||
await VideoPathManager.Instance.makeAvailablePlaylistFile(playlist, playlist.playlistFilename, masterPlaylistPath => { | |||
return writeFile(masterPlaylistPath, masterPlaylists.join('\n') + '\n') | |||
}) | |||
} | |||
async function updateSha256VODSegments (video: MVideoWithFile, playlist: MStreamingPlaylistFilesVideo) { | |||
const json: { [filename: string]: { [range: string]: string } } = {} | |||
const playlistDirectory = join(HLS_STREAMING_PLAYLIST_DIRECTORY, video.uuid) | |||
// For all the resolutions available for this video | |||
for (const file of playlist.VideoFiles) { | |||
const rangeHashes: { [range: string]: string } = {} | |||
const videoPath = getVideoFilePath(playlist, file) | |||
const resolutionPlaylistPath = join(playlistDirectory, getHlsResolutionPlaylistFilename(file.filename)) | |||
// Maybe the playlist is not generated for this resolution yet | |||
if (!await pathExists(resolutionPlaylistPath)) continue | |||
await VideoPathManager.Instance.makeAvailableVideoFile(playlist, file, videoPath => { | |||
const playlistContent = await readFile(resolutionPlaylistPath) | |||
const ranges = getRangesFromPlaylist(playlistContent.toString()) | |||
return VideoPathManager.Instance.makeAvailableResolutionPlaylistFile(playlist, file, async resolutionPlaylistPath => { | |||
const playlistContent = await readFile(resolutionPlaylistPath) | |||
const ranges = getRangesFromPlaylist(playlistContent.toString()) | |||
const fd = await open(videoPath, 'r') | |||
for (const range of ranges) { | |||
const buf = Buffer.alloc(range.length) | |||
await read(fd, buf, 0, range.length, range.offset) | |||
const fd = await open(videoPath, 'r') | |||
for (const range of ranges) { | |||
const buf = Buffer.alloc(range.length) | |||
await read(fd, buf, 0, range.length, range.offset) | |||
rangeHashes[`${range.offset}-${range.offset + range.length - 1}`] = sha256(buf) | |||
} | |||
await close(fd) | |||
rangeHashes[`${range.offset}-${range.offset + range.length - 1}`] = sha256(buf) | |||
} | |||
await close(fd) | |||
const videoFilename = file.filename | |||
json[videoFilename] = rangeHashes | |||
const videoFilename = file.filename | |||
json[videoFilename] = rangeHashes | |||
}) | |||
}) | |||
} | |||
const outputPath = join(playlistDirectory, playlist.segmentsSha256Filename) | |||
const outputPath = VideoPathManager.Instance.getFSHLSOutputPath(video, playlist.segmentsSha256Filename) | |||
await outputJSON(outputPath, json) | |||
} | |||
@@ -0,0 +1,114 @@ | |||
import * as Bull from 'bull' | |||
import { remove } from 'fs-extra' | |||
import { join } from 'path' | |||
import { logger } from '@server/helpers/logger' | |||
import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' | |||
import { CONFIG } from '@server/initializers/config' | |||
import { storeHLSFile, storeWebTorrentFile } from '@server/lib/object-storage' | |||
import { getHLSDirectory, getHlsResolutionPlaylistFilename } from '@server/lib/paths' | |||
import { moveToNextState } from '@server/lib/video-state' | |||
import { VideoModel } from '@server/models/video/video' | |||
import { VideoJobInfoModel } from '@server/models/video/video-job-info' | |||
import { MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoWithAllFiles } from '@server/types/models' | |||
import { MoveObjectStoragePayload, VideoStorage } from '../../../../shared' | |||
export async function processMoveToObjectStorage (job: Bull.Job) { | |||
const payload = job.data as MoveObjectStoragePayload | |||
logger.info('Moving video %s in job %d.', payload.videoUUID, job.id) | |||
const video = await VideoModel.loadWithFiles(payload.videoUUID) | |||
// No video, maybe deleted? | |||
if (!video) { | |||
logger.info('Can\'t process job %d, video does not exist.', job.id) | |||
return undefined | |||
} | |||
if (video.VideoFiles) { | |||
await moveWebTorrentFiles(video) | |||
} | |||
if (video.VideoStreamingPlaylists) { | |||
await moveHLSFiles(video) | |||
} | |||
const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove') | |||
if (pendingMove === 0) { | |||
logger.info('Running cleanup after moving files to object storage (video %s in job %d)', video.uuid, job.id) | |||
await doAfterLastJob(video, payload.isNewVideo) | |||
} | |||
return payload.videoUUID | |||
} | |||
// --------------------------------------------------------------------------- | |||
async function moveWebTorrentFiles (video: MVideoWithAllFiles) { | |||
for (const file of video.VideoFiles) { | |||
if (file.storage !== VideoStorage.FILE_SYSTEM) continue | |||
const fileUrl = await storeWebTorrentFile(file.filename) | |||
const oldPath = join(CONFIG.STORAGE.VIDEOS_DIR, file.filename) | |||
await onFileMoved({ videoOrPlaylist: video, file, fileUrl, oldPath }) | |||
} | |||
} | |||
async function moveHLSFiles (video: MVideoWithAllFiles) { | |||
for (const playlist of video.VideoStreamingPlaylists) { | |||
for (const file of playlist.VideoFiles) { | |||
if (file.storage !== VideoStorage.FILE_SYSTEM) continue | |||
// Resolution playlist | |||
const playlistFilename = getHlsResolutionPlaylistFilename(file.filename) | |||
await storeHLSFile(playlist, video, playlistFilename) | |||
// Resolution fragmented file | |||
const fileUrl = await storeHLSFile(playlist, video, file.filename) | |||
const oldPath = join(getHLSDirectory(video), file.filename) | |||
await onFileMoved({ videoOrPlaylist: Object.assign(playlist, { Video: video }), file, fileUrl, oldPath }) | |||
} | |||
} | |||
} | |||
async function doAfterLastJob (video: MVideoWithAllFiles, isNewVideo: boolean) { | |||
for (const playlist of video.VideoStreamingPlaylists) { | |||
if (playlist.storage === VideoStorage.OBJECT_STORAGE) continue | |||
// Master playlist | |||
playlist.playlistUrl = await storeHLSFile(playlist, video, playlist.playlistFilename) | |||
// Sha256 segments file | |||
playlist.segmentsSha256Url = await storeHLSFile(playlist, video, playlist.segmentsSha256Filename) | |||
playlist.storage = VideoStorage.OBJECT_STORAGE | |||
await playlist.save() | |||
} | |||
// Remove empty hls video directory | |||
if (video.VideoStreamingPlaylists) { | |||
await remove(getHLSDirectory(video)) | |||
} | |||
await moveToNextState(video, isNewVideo) | |||
} | |||
async function onFileMoved (options: { | |||
videoOrPlaylist: MVideo | MStreamingPlaylistVideo | |||
file: MVideoFile | |||
fileUrl: string | |||
oldPath: string | |||
}) { | |||
const { videoOrPlaylist, file, fileUrl, oldPath } = options | |||
file.fileUrl = fileUrl | |||
file.storage = VideoStorage.OBJECT_STORAGE | |||
await createTorrentAndSetInfoHash(videoOrPlaylist, file) | |||
await file.save() | |||
logger.debug('Removing %s because it\'s now on object storage', oldPath) | |||
await remove(oldPath) | |||
} |
@@ -2,15 +2,19 @@ import * as Bull from 'bull' | |||
import { copy, stat } from 'fs-extra' | |||
import { getLowercaseExtension } from '@server/helpers/core-utils' | |||
import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' | |||
import { generateWebTorrentVideoFilename, getVideoFilePath } from '@server/lib/video-paths' | |||
import { CONFIG } from '@server/initializers/config' | |||
import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' | |||
import { generateWebTorrentVideoFilename } from '@server/lib/paths' | |||
import { addMoveToObjectStorageJob } from '@server/lib/video' | |||
import { VideoPathManager } from '@server/lib/video-path-manager' | |||
import { UserModel } from '@server/models/user/user' | |||
import { MVideoFullLight } from '@server/types/models' | |||
import { VideoFileImportPayload } from '@shared/models' | |||
import { VideoFileImportPayload, VideoStorage } from '@shared/models' | |||
import { getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffprobe-utils' | |||
import { logger } from '../../../helpers/logger' | |||
import { VideoModel } from '../../../models/video/video' | |||
import { VideoFileModel } from '../../../models/video/video-file' | |||
import { onNewWebTorrentFileResolution } from './video-transcoding' | |||
import { createHlsJobIfEnabled } from './video-transcoding' | |||
async function processVideoFileImport (job: Bull.Job) { | |||
const payload = job.data as VideoFileImportPayload | |||
@@ -29,15 +33,19 @@ async function processVideoFileImport (job: Bull.Job) { | |||
const user = await UserModel.loadByChannelActorId(video.VideoChannel.actorId) | |||
const newResolutionPayload = { | |||
type: 'new-resolution-to-webtorrent' as 'new-resolution-to-webtorrent', | |||
await createHlsJobIfEnabled(user, { | |||
videoUUID: video.uuid, | |||
resolution: data.resolution, | |||
isPortraitMode: data.isPortraitMode, | |||
copyCodecs: false, | |||
isNewVideo: false | |||
copyCodecs: true, | |||
isMaxQuality: false | |||
}) | |||
if (CONFIG.OBJECT_STORAGE.ENABLED) { | |||
await addMoveToObjectStorageJob(video) | |||
} else { | |||
await federateVideoIfNeeded(video, false) | |||
} | |||
await onNewWebTorrentFileResolution(video, user, newResolutionPayload) | |||
return video | |||
} | |||
@@ -72,12 +80,13 @@ async function updateVideoFile (video: MVideoFullLight, inputFilePath: string) { | |||
resolution, | |||
extname: fileExt, | |||
filename: generateWebTorrentVideoFilename(resolution, fileExt), | |||
storage: VideoStorage.FILE_SYSTEM, | |||
size, | |||
fps, | |||
videoId: video.id | |||
}) | |||
const outputPath = getVideoFilePath(video, newVideoFile) | |||
const outputPath = VideoPathManager.Instance.getFSVideoFileOutputPath(video, newVideoFile) | |||
await copy(inputFilePath, outputPath) | |||
video.VideoFiles.push(newVideoFile) | |||
@@ -4,11 +4,13 @@ import { getLowercaseExtension } from '@server/helpers/core-utils' | |||
import { retryTransactionWrapper } from '@server/helpers/database-utils' | |||
import { YoutubeDL } from '@server/helpers/youtube-dl' | |||
import { isPostImportVideoAccepted } from '@server/lib/moderation' | |||
import { generateWebTorrentVideoFilename } from '@server/lib/paths' | |||
import { Hooks } from '@server/lib/plugins/hooks' | |||
import { ServerConfigManager } from '@server/lib/server-config-manager' | |||
import { isAbleToUploadVideo } from '@server/lib/user' | |||
import { addOptimizeOrMergeAudioJob } from '@server/lib/video' | |||
import { generateWebTorrentVideoFilename, getVideoFilePath } from '@server/lib/video-paths' | |||
import { addMoveToObjectStorageJob, addOptimizeOrMergeAudioJob } from '@server/lib/video' | |||
import { VideoPathManager } from '@server/lib/video-path-manager' | |||
import { buildNextVideoState } from '@server/lib/video-state' | |||
import { ThumbnailModel } from '@server/models/video/thumbnail' | |||
import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' | |||
import { | |||
@@ -25,7 +27,6 @@ import { getDurationFromVideoFile, getVideoFileFPS, getVideoFileResolution } fro | |||
import { logger } from '../../../helpers/logger' | |||
import { getSecureTorrentName } from '../../../helpers/utils' | |||
import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent' | |||
import { CONFIG } from '../../../initializers/config' | |||
import { VIDEO_IMPORT_TIMEOUT } from '../../../initializers/constants' | |||
import { sequelizeTypescript } from '../../../initializers/database' | |||
import { VideoModel } from '../../../models/video/video' | |||
@@ -100,7 +101,6 @@ type ProcessFileOptions = { | |||
} | |||
async function processFile (downloader: () => Promise<string>, videoImport: MVideoImportDefault, options: ProcessFileOptions) { | |||
let tempVideoPath: string | |||
let videoDestFile: string | |||
let videoFile: VideoFileModel | |||
try { | |||
@@ -159,7 +159,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid | |||
const videoImportWithFiles: MVideoImportDefaultFiles = Object.assign(videoImport, { Video: videoWithFiles }) | |||
// Move file | |||
videoDestFile = getVideoFilePath(videoImportWithFiles.Video, videoFile) | |||
const videoDestFile = VideoPathManager.Instance.getFSVideoFileOutputPath(videoImportWithFiles.Video, videoFile) | |||
await move(tempVideoPath, videoDestFile) | |||
tempVideoPath = null // This path is not used anymore | |||
@@ -204,7 +204,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid | |||
// Update video DB object | |||
video.duration = duration | |||
video.state = CONFIG.TRANSCODING.ENABLED ? VideoState.TO_TRANSCODE : VideoState.PUBLISHED | |||
video.state = buildNextVideoState(video.state) | |||
await video.save({ transaction: t }) | |||
if (thumbnailModel) await video.addAndSaveThumbnail(thumbnailModel, t) | |||
@@ -245,6 +245,10 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid | |||
Notifier.Instance.notifyOnNewVideoIfNeeded(video) | |||
} | |||
if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { | |||
return addMoveToObjectStorageJob(videoImportUpdated.Video) | |||
} | |||
// Create transcoding jobs? | |||
if (video.state === VideoState.TO_TRANSCODE) { | |||
await addOptimizeOrMergeAudioJob(videoImportUpdated.Video, videoFile, videoImport.User) | |||
@@ -4,10 +4,11 @@ import { join } from 'path' | |||
import { ffprobePromise, getAudioStream, getDurationFromVideoFile, getVideoFileResolution } from '@server/helpers/ffprobe-utils' | |||
import { VIDEO_LIVE } from '@server/initializers/constants' | |||
import { buildConcatenatedName, cleanupLive, LiveSegmentShaStore } from '@server/lib/live' | |||
import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveDirectory } from '@server/lib/paths' | |||
import { generateVideoMiniature } from '@server/lib/thumbnail' | |||
import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/video-transcoding' | |||
import { publishAndFederateIfNeeded } from '@server/lib/video' | |||
import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getHLSDirectory } from '@server/lib/video-paths' | |||
import { VideoPathManager } from '@server/lib/video-path-manager' | |||
import { moveToNextState } from '@server/lib/video-state' | |||
import { VideoModel } from '@server/models/video/video' | |||
import { VideoFileModel } from '@server/models/video/video-file' | |||
import { VideoLiveModel } from '@server/models/video/video-live' | |||
@@ -55,16 +56,15 @@ export { | |||
// --------------------------------------------------------------------------- | |||
async function saveLive (video: MVideo, live: MVideoLive, streamingPlaylist: MStreamingPlaylist) { | |||
const hlsDirectory = getHLSDirectory(video, false) | |||
const replayDirectory = join(hlsDirectory, VIDEO_LIVE.REPLAY_DIRECTORY) | |||
const replayDirectory = VideoPathManager.Instance.getFSHLSOutputPath(video, VIDEO_LIVE.REPLAY_DIRECTORY) | |||
const rootFiles = await readdir(hlsDirectory) | |||
const rootFiles = await readdir(getLiveDirectory(video)) | |||
const playlistFiles = rootFiles.filter(file => { | |||
return file.endsWith('.m3u8') && file !== streamingPlaylist.playlistFilename | |||
}) | |||
await cleanupLiveFiles(hlsDirectory) | |||
await cleanupTMPLiveFiles(getLiveDirectory(video)) | |||
await live.destroy() | |||
@@ -98,7 +98,7 @@ async function saveLive (video: MVideo, live: MVideoLive, streamingPlaylist: MSt | |||
const { resolution, isPortraitMode } = await getVideoFileResolution(concatenatedTsFilePath, probe) | |||
const outputPath = await generateHlsPlaylistResolutionFromTS({ | |||
const { resolutionPlaylistPath: outputPath } = await generateHlsPlaylistResolutionFromTS({ | |||
video: videoWithFiles, | |||
concatenatedTsFilePath, | |||
resolution, | |||
@@ -133,10 +133,10 @@ async function saveLive (video: MVideo, live: MVideoLive, streamingPlaylist: MSt | |||
}) | |||
} | |||
await publishAndFederateIfNeeded(videoWithFiles, true) | |||
await moveToNextState(videoWithFiles, false) | |||
} | |||
async function cleanupLiveFiles (hlsDirectory: string) { | |||
async function cleanupTMPLiveFiles (hlsDirectory: string) { | |||
if (!await pathExists(hlsDirectory)) return | |||
const files = await readdir(hlsDirectory) | |||
@@ -1,9 +1,11 @@ | |||
import * as Bull from 'bull' | |||
import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils' | |||
import { getTranscodingJobPriority, publishAndFederateIfNeeded } from '@server/lib/video' | |||
import { getVideoFilePath } from '@server/lib/video-paths' | |||
import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' | |||
import { VideoPathManager } from '@server/lib/video-path-manager' | |||
import { moveToNextState } from '@server/lib/video-state' | |||
import { UserModel } from '@server/models/user/user' | |||
import { MUser, MUserId, MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models' | |||
import { VideoJobInfoModel } from '@server/models/video/video-job-info' | |||
import { MUser, MUserId, MVideo, MVideoFullLight, MVideoWithFile } from '@server/types/models' | |||
import { | |||
HLSTranscodingPayload, | |||
MergeAudioTranscodingPayload, | |||
@@ -16,17 +18,14 @@ import { computeResolutionsToTranscode } from '../../../helpers/ffprobe-utils' | |||
import { logger } from '../../../helpers/logger' | |||
import { CONFIG } from '../../../initializers/config' | |||
import { VideoModel } from '../../../models/video/video' | |||
import { federateVideoIfNeeded } from '../../activitypub/videos' | |||
import { Notifier } from '../../notifier' | |||
import { | |||
generateHlsPlaylistResolution, | |||
mergeAudioVideofile, | |||
optimizeOriginalVideofile, | |||
transcodeNewWebTorrentResolution | |||
} from '../../transcoding/video-transcoding' | |||
import { JobQueue } from '../job-queue' | |||
type HandlerFunction = (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<any> | |||
type HandlerFunction = (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void> | |||
const handlers: { [ id in VideoTranscodingPayload['type'] ]: HandlerFunction } = { | |||
'new-resolution-to-hls': handleHLSJob, | |||
@@ -69,15 +68,16 @@ async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, vide | |||
: video.getMaxQualityFile() | |||
const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() | |||
const videoInputPath = getVideoFilePath(videoOrStreamingPlaylist, videoFileInput) | |||
await generateHlsPlaylistResolution({ | |||
video, | |||
videoInputPath, | |||
resolution: payload.resolution, | |||
copyCodecs: payload.copyCodecs, | |||
isPortraitMode: payload.isPortraitMode || false, | |||
job | |||
await VideoPathManager.Instance.makeAvailableVideoFile(videoOrStreamingPlaylist, videoFileInput, videoInputPath => { | |||
return generateHlsPlaylistResolution({ | |||
video, | |||
videoInputPath, | |||
resolution: payload.resolution, | |||
copyCodecs: payload.copyCodecs, | |||
isPortraitMode: payload.isPortraitMode || false, | |||
job | |||
}) | |||
}) | |||
await retryTransactionWrapper(onHlsPlaylistGeneration, video, user, payload) | |||
@@ -101,7 +101,7 @@ async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudio | |||
} | |||
async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { | |||
const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) | |||
const { transcodeType } = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) | |||
await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType, user) | |||
} | |||
@@ -121,10 +121,18 @@ async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, pay | |||
video.VideoFiles = [] | |||
// Create HLS new resolution jobs | |||
await createLowerResolutionsJobs(video, user, payload.resolution, payload.isPortraitMode, 'hls') | |||
await createLowerResolutionsJobs({ | |||
video, | |||
user, | |||
videoFileResolution: payload.resolution, | |||
isPortraitMode: payload.isPortraitMode, | |||
isNewVideo: payload.isNewVideo ?? true, | |||
type: 'hls' | |||
}) | |||
} | |||
return publishAndFederateIfNeeded(video) | |||
await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') | |||
await moveToNextState(video, payload.isNewVideo) | |||
} | |||
async function onVideoFileOptimizer ( | |||
@@ -143,58 +151,54 @@ async function onVideoFileOptimizer ( | |||
// Video does not exist anymore | |||
if (!videoDatabase) return undefined | |||
let videoPublished = false | |||
// Generate HLS version of the original file | |||
const originalFileHLSPayload = Object.assign({}, payload, { | |||
const originalFileHLSPayload = { | |||
...payload, | |||
isPortraitMode, | |||
resolution: videoDatabase.getMaxQualityFile().resolution, | |||
// If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues | |||
copyCodecs: transcodeType !== 'quick-transcode', | |||
isMaxQuality: true | |||
}) | |||
} | |||
const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload) | |||
const hasNewResolutions = await createLowerResolutionsJobs({ | |||
video: videoDatabase, | |||
user, | |||
videoFileResolution: resolution, | |||
isPortraitMode, | |||
type: 'webtorrent', | |||
isNewVideo: payload.isNewVideo ?? true | |||
}) | |||
const hasNewResolutions = await createLowerResolutionsJobs(videoDatabase, user, resolution, isPortraitMode, 'webtorrent') | |||
await VideoJobInfoModel.decrease(videoDatabase.uuid, 'pendingTranscode') | |||
// Move to next state if there are no other resolutions to generate | |||
if (!hasHls && !hasNewResolutions) { | |||
// No transcoding to do, it's now published | |||
videoPublished = await videoDatabase.publishIfNeededAndSave(undefined) | |||
await moveToNextState(videoDatabase, payload.isNewVideo) | |||
} | |||
await federateVideoIfNeeded(videoDatabase, payload.isNewVideo) | |||
if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoDatabase) | |||
if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase) | |||
} | |||
async function onNewWebTorrentFileResolution ( | |||
video: MVideoUUID, | |||
video: MVideo, | |||
user: MUserId, | |||
payload: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload | |||
) { | |||
await publishAndFederateIfNeeded(video) | |||
await createHlsJobIfEnabled(user, { ...payload, copyCodecs: true, isMaxQuality: false }) | |||
await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') | |||
await createHlsJobIfEnabled(user, Object.assign({}, payload, { copyCodecs: true, isMaxQuality: false })) | |||
await moveToNextState(video, payload.isNewVideo) | |||
} | |||
// --------------------------------------------------------------------------- | |||
export { | |||
processVideoTranscoding, | |||
onNewWebTorrentFileResolution | |||
} | |||
// --------------------------------------------------------------------------- | |||
async function createHlsJobIfEnabled (user: MUserId, payload: { | |||
videoUUID: string | |||
resolution: number | |||
isPortraitMode?: boolean | |||
copyCodecs: boolean | |||
isMaxQuality: boolean | |||
isNewVideo?: boolean | |||
}) { | |||
if (!payload || CONFIG.TRANSCODING.HLS.ENABLED !== true) return false | |||
if (!payload || CONFIG.TRANSCODING.ENABLED !== true || CONFIG.TRANSCODING.HLS.ENABLED !== true) return false | |||
const jobOptions = { | |||
priority: await getTranscodingJobPriority(user) | |||
@@ -206,21 +210,35 @@ async function createHlsJobIfEnabled (user: MUserId, payload: { | |||
resolution: payload.resolution, | |||
isPortraitMode: payload.isPortraitMode, | |||
copyCodecs: payload.copyCodecs, | |||
isMaxQuality: payload.isMaxQuality | |||
isMaxQuality: payload.isMaxQuality, | |||
isNewVideo: payload.isNewVideo | |||
} | |||
JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload }, jobOptions) | |||
await addTranscodingJob(hlsTranscodingPayload, jobOptions) | |||
return true | |||
} | |||
async function createLowerResolutionsJobs ( | |||
video: MVideoFullLight, | |||
user: MUserId, | |||
videoFileResolution: number, | |||
isPortraitMode: boolean, | |||
// --------------------------------------------------------------------------- | |||
export { | |||
processVideoTranscoding, | |||
createHlsJobIfEnabled, | |||
onNewWebTorrentFileResolution | |||
} | |||
// --------------------------------------------------------------------------- | |||
async function createLowerResolutionsJobs (options: { | |||
video: MVideoFullLight | |||
user: MUserId | |||
videoFileResolution: number | |||
isPortraitMode: boolean | |||
isNewVideo: boolean | |||
type: 'hls' | 'webtorrent' | |||
) { | |||
}) { | |||
const { video, user, videoFileResolution, isPortraitMode, isNewVideo, type } = options | |||
// Create transcoding jobs if there are enabled resolutions | |||
const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod') | |||
const resolutionCreated: number[] = [] | |||
@@ -234,7 +252,8 @@ async function createLowerResolutionsJobs ( | |||
type: 'new-resolution-to-webtorrent', | |||
videoUUID: video.uuid, | |||
resolution, | |||
isPortraitMode | |||
isPortraitMode, | |||
isNewVideo | |||
} | |||
} | |||
@@ -245,7 +264,8 @@ async function createLowerResolutionsJobs ( | |||
resolution, | |||
isPortraitMode, | |||
copyCodecs: false, | |||
isMaxQuality: false | |||
isMaxQuality: false, | |||
isNewVideo | |||
} | |||
} | |||
@@ -257,7 +277,7 @@ async function createLowerResolutionsJobs ( | |||
priority: await getTranscodingJobPriority(user) | |||
} | |||
JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }, jobOptions) | |||
await addTranscodingJob(dataInput, jobOptions) | |||
} | |||
if (resolutionCreated.length === 0) { | |||
@@ -11,6 +11,7 @@ import { | |||
EmailPayload, | |||
JobState, | |||
JobType, | |||
MoveObjectStoragePayload, | |||
RefreshPayload, | |||
VideoFileImportPayload, | |||
VideoImportPayload, | |||
@@ -34,6 +35,7 @@ import { processVideoImport } from './handlers/video-import' | |||
import { processVideoLiveEnding } from './handlers/video-live-ending' | |||
import { processVideoTranscoding } from './handlers/video-transcoding' | |||
import { processVideosViews } from './handlers/video-views' | |||
import { processMoveToObjectStorage } from './handlers/move-to-object-storage' | |||
type CreateJobArgument = | |||
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | |||
@@ -49,9 +51,10 @@ type CreateJobArgument = | |||
{ type: 'videos-views', payload: {} } | | |||
{ type: 'video-live-ending', payload: VideoLiveEndingPayload } | | |||
{ type: 'actor-keys', payload: ActorKeysPayload } | | |||
{ type: 'video-redundancy', payload: VideoRedundancyPayload } | |||
{ type: 'video-redundancy', payload: VideoRedundancyPayload } | | |||
{ type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | |||
type CreateJobOptions = { | |||
export type CreateJobOptions = { | |||
delay?: number | |||
priority?: number | |||
} | |||
@@ -70,7 +73,8 @@ const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { | |||
'activitypub-refresher': refreshAPObject, | |||
'video-live-ending': processVideoLiveEnding, | |||
'actor-keys': processActorKeys, | |||
'video-redundancy': processVideoRedundancy | |||
'video-redundancy': processVideoRedundancy, | |||
'move-to-object-storage': processMoveToObjectStorage | |||
} | |||
const jobTypes: JobType[] = [ | |||
@@ -87,7 +91,8 @@ const jobTypes: JobType[] = [ | |||
'activitypub-refresher', | |||
'video-redundancy', | |||
'actor-keys', | |||
'video-live-ending' | |||
'video-live-ending', | |||
'move-to-object-storage' | |||
] | |||
class JobQueue { | |||
@@ -20,7 +20,7 @@ import { VideoState, VideoStreamingPlaylistType } from '@shared/models' | |||
import { federateVideoIfNeeded } from '../activitypub/videos' | |||
import { JobQueue } from '../job-queue' | |||
import { PeerTubeSocket } from '../peertube-socket' | |||
import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename } from '../video-paths' | |||
import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename } from '../paths' | |||
import { LiveQuotaStore } from './live-quota-store' | |||
import { LiveSegmentShaStore } from './live-segment-sha-store' | |||
import { cleanupLive } from './live-utils' | |||
@@ -1,7 +1,7 @@ | |||
import { remove } from 'fs-extra' | |||
import { basename } from 'path' | |||
import { MStreamingPlaylist, MVideo } from '@server/types/models' | |||
import { getHLSDirectory } from '../video-paths' | |||
import { getLiveDirectory } from '../paths' | |||
function buildConcatenatedName (segmentOrPlaylistPath: string) { | |||
const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/) | |||
@@ -10,7 +10,7 @@ function buildConcatenatedName (segmentOrPlaylistPath: string) { | |||
} | |||
async function cleanupLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { | |||
const hlsDirectory = getHLSDirectory(video) | |||
const hlsDirectory = getLiveDirectory(video) | |||
await remove(hlsDirectory) | |||
@@ -11,9 +11,9 @@ import { CONFIG } from '@server/initializers/config' | |||
import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants' | |||
import { VideoFileModel } from '@server/models/video/video-file' | |||
import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' | |||
import { getLiveDirectory } from '../../paths' | |||
import { VideoTranscodingProfilesManager } from '../../transcoding/video-transcoding-profiles' | |||
import { isAbleToUploadVideo } from '../../user' | |||
import { getHLSDirectory } from '../../video-paths' | |||
import { LiveQuotaStore } from '../live-quota-store' | |||
import { LiveSegmentShaStore } from '../live-segment-sha-store' | |||
import { buildConcatenatedName } from '../live-utils' | |||
@@ -282,7 +282,7 @@ class MuxingSession extends EventEmitter { | |||
} | |||
private async prepareDirectories () { | |||
const outPath = getHLSDirectory(this.videoLive.Video) | |||
const outPath = getLiveDirectory(this.videoLive.Video) | |||
await ensureDir(outPath) | |||
const replayDirectory = join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY) | |||
@@ -0,0 +1,3 @@ | |||
export * from './keys' | |||
export * from './urls' | |||
export * from './videos' |
@@ -0,0 +1,20 @@ | |||
import { join } from 'path' | |||
import { MStreamingPlaylist, MVideoUUID } from '@server/types/models' | |||
function generateHLSObjectStorageKey (playlist: MStreamingPlaylist, video: MVideoUUID, filename: string) { | |||
return join(generateHLSObjectBaseStorageKey(playlist, video), filename) | |||
} | |||
function generateHLSObjectBaseStorageKey (playlist: MStreamingPlaylist, video: MVideoUUID) { | |||
return playlist.getStringType() + '_' + video.uuid | |||
} | |||
function generateWebTorrentObjectStorageKey (filename: string) { | |||
return filename | |||
} | |||
export { | |||
generateHLSObjectStorageKey, | |||
generateHLSObjectBaseStorageKey, | |||
generateWebTorrentObjectStorageKey | |||
} |
@@ -0,0 +1,56 @@ | |||
import { S3Client } from '@aws-sdk/client-s3' | |||
import { logger } from '@server/helpers/logger' | |||
import { CONFIG } from '@server/initializers/config' | |||
import { lTags } from './logger' | |||
let endpointParsed: URL | |||
function getEndpointParsed () { | |||
if (endpointParsed) return endpointParsed | |||
endpointParsed = new URL(getEndpoint()) | |||
return endpointParsed | |||
} | |||
let s3Client: S3Client | |||
function getClient () { | |||
if (s3Client) return s3Client | |||
const OBJECT_STORAGE = CONFIG.OBJECT_STORAGE | |||
s3Client = new S3Client({ | |||
endpoint: getEndpoint(), | |||
region: OBJECT_STORAGE.REGION, | |||
credentials: OBJECT_STORAGE.CREDENTIALS.ACCESS_KEY_ID | |||
? { | |||
accessKeyId: OBJECT_STORAGE.CREDENTIALS.ACCESS_KEY_ID, | |||
secretAccessKey: OBJECT_STORAGE.CREDENTIALS.SECRET_ACCESS_KEY | |||
} | |||
: undefined | |||
}) | |||
logger.info('Initialized S3 client %s with region %s.', getEndpoint(), OBJECT_STORAGE.REGION, lTags()) | |||
return s3Client | |||
} | |||
// --------------------------------------------------------------------------- | |||
export { | |||
getEndpointParsed, | |||
getClient | |||
} | |||
// --------------------------------------------------------------------------- | |||
let endpoint: string | |||
function getEndpoint () { | |||
if (endpoint) return endpoint | |||
const endpointConfig = CONFIG.OBJECT_STORAGE.ENDPOINT | |||
endpoint = endpointConfig.startsWith('http://') || endpointConfig.startsWith('https://') | |||
? CONFIG.OBJECT_STORAGE.ENDPOINT | |||
: 'https://' + CONFIG.OBJECT_STORAGE.ENDPOINT | |||
return endpoint | |||
} |
@@ -0,0 +1,3 @@ | |||
export * from './client' | |||
export * from './logger' | |||
export * from './object-storage-helpers' |
@@ -0,0 +1,7 @@ | |||
import { loggerTagsFactory } from '@server/helpers/logger' | |||
const lTags = loggerTagsFactory('object-storage') | |||
export { | |||
lTags | |||
} |
@@ -0,0 +1,229 @@ | |||
import { close, createReadStream, createWriteStream, ensureDir, open, ReadStream, stat } from 'fs-extra' | |||
import { min } from 'lodash' | |||
import { dirname } from 'path' | |||
import { Readable } from 'stream' | |||
import { | |||
CompletedPart, | |||
CompleteMultipartUploadCommand, | |||
CreateMultipartUploadCommand, | |||
DeleteObjectCommand, | |||
GetObjectCommand, | |||
ListObjectsV2Command, | |||
PutObjectCommand, | |||
UploadPartCommand | |||
} from '@aws-sdk/client-s3' | |||
import { pipelinePromise } from '@server/helpers/core-utils' | |||
import { isArray } from '@server/helpers/custom-validators/misc' | |||
import { logger } from '@server/helpers/logger' | |||
import { CONFIG } from '@server/initializers/config' | |||
import { getPrivateUrl } from '../urls' | |||
import { getClient } from './client' | |||
import { lTags } from './logger' | |||
type BucketInfo = { | |||
BUCKET_NAME: string | |||
PREFIX?: string | |||
} | |||
async function storeObject (options: { | |||
inputPath: string | |||
objectStorageKey: string | |||
bucketInfo: BucketInfo | |||
}): Promise<string> { | |||
const { inputPath, objectStorageKey, bucketInfo } = options | |||
logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) | |||
const stats = await stat(inputPath) | |||
// If bigger than max allowed size we do a multipart upload | |||
if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) { | |||
return multiPartUpload({ inputPath, objectStorageKey, bucketInfo }) | |||
} | |||
const fileStream = createReadStream(inputPath) | |||
return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo }) | |||
} | |||
async function removeObject (filename: string, bucketInfo: BucketInfo) { | |||
const command = new DeleteObjectCommand({ | |||
Bucket: bucketInfo.BUCKET_NAME, | |||
Key: buildKey(filename, bucketInfo) | |||
}) | |||
return getClient().send(command) | |||
} | |||
async function removePrefix (prefix: string, bucketInfo: BucketInfo) { | |||
const s3Client = getClient() | |||
const commandPrefix = bucketInfo.PREFIX + prefix | |||
const listCommand = new ListObjectsV2Command({ | |||
Bucket: bucketInfo.BUCKET_NAME, | |||
Prefix: commandPrefix | |||
}) | |||
const listedObjects = await s3Client.send(listCommand) | |||
// FIXME: use bulk delete when s3ninja will support this operation | |||
// const deleteParams = { | |||
// Bucket: bucketInfo.BUCKET_NAME, | |||
// Delete: { Objects: [] } | |||
// } | |||
if (isArray(listedObjects.Contents) !== true) { | |||
const message = `Cannot remove ${commandPrefix} prefix in bucket ${bucketInfo.BUCKET_NAME}: no files listed.` | |||
logger.error(message, { response: listedObjects, ...lTags() }) | |||
throw new Error(message) | |||
} | |||
for (const object of listedObjects.Contents) { | |||
const command = new DeleteObjectCommand({ | |||
Bucket: bucketInfo.BUCKET_NAME, | |||
Key: object.Key | |||
}) | |||
await s3Client.send(command) | |||
// FIXME: use bulk delete when s3ninja will support this operation | |||
// deleteParams.Delete.Objects.push({ Key: object.Key }) | |||
} | |||
// FIXME: use bulk delete when s3ninja will support this operation | |||
// const deleteCommand = new DeleteObjectsCommand(deleteParams) | |||
// await s3Client.send(deleteCommand) | |||
// Repeat if not all objects could be listed at once (limit of 1000?) | |||
if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo) | |||
} | |||
async function makeAvailable (options: { | |||
key: string | |||
destination: string | |||
bucketInfo: BucketInfo | |||
}) { | |||
const { key, destination, bucketInfo } = options | |||
await ensureDir(dirname(options.destination)) | |||
const command = new GetObjectCommand({ | |||
Bucket: bucketInfo.BUCKET_NAME, | |||
Key: buildKey(key, bucketInfo) | |||
}) | |||
const response = await getClient().send(command) | |||
const file = createWriteStream(destination) | |||
await pipelinePromise(response.Body as Readable, file) | |||
file.close() | |||
} | |||
function buildKey (key: string, bucketInfo: BucketInfo) { | |||
return bucketInfo.PREFIX + key | |||
} | |||
// --------------------------------------------------------------------------- | |||
export { | |||
BucketInfo, | |||
buildKey, | |||
storeObject, | |||
removeObject, | |||
removePrefix, | |||
makeAvailable | |||
} | |||
// --------------------------------------------------------------------------- | |||
async function objectStoragePut (options: { | |||
objectStorageKey: string | |||
content: ReadStream | |||
bucketInfo: BucketInfo | |||
}) { | |||
const { objectStorageKey, content, bucketInfo } = options | |||
const command = new PutObjectCommand({ | |||
Bucket: bucketInfo.BUCKET_NAME, | |||
Key: buildKey(objectStorageKey, bucketInfo), | |||
Body: content | |||
}) | |||
await getClient().send(command) | |||
return getPrivateUrl(bucketInfo, objectStorageKey) | |||
} | |||
async function multiPartUpload (options: { | |||
inputPath: string | |||
objectStorageKey: string | |||
bucketInfo: BucketInfo | |||
}) { | |||
const { objectStorageKey, inputPath, bucketInfo } = options | |||
const key = buildKey(objectStorageKey, bucketInfo) | |||
const s3Client = getClient() | |||
const statResult = await stat(inputPath) | |||
const createMultipartCommand = new CreateMultipartUploadCommand({ | |||
Bucket: bucketInfo.BUCKET_NAME, | |||
Key: key | |||
}) | |||
const createResponse = await s3Client.send(createMultipartCommand) | |||
const fd = await open(inputPath, 'r') | |||
let partNumber = 1 | |||
const parts: CompletedPart[] = [] | |||
const partSize = CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART | |||
for (let start = 0; start < statResult.size; start += partSize) { | |||
logger.debug( | |||
'Uploading part %d of file to %s%s in bucket %s', | |||
partNumber, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags() | |||
) | |||
// FIXME: Remove when https://github.com/aws/aws-sdk-js-v3/pull/2637 is released | |||
// The s3 sdk needs to know the length of the http body beforehand, but doesn't support | |||
// streams with start and end set, so it just tries to stat the file in stream.path. | |||
// This fails for us because we only want to send part of the file. The stream type | |||
// is modified so we can set the byteLength here, which s3 detects because array buffers | |||
// have this field set | |||
const stream: ReadStream & { byteLength: number } = | |||
createReadStream( | |||
inputPath, | |||
{ fd, autoClose: false, start, end: (start + partSize) - 1 } | |||
) as ReadStream & { byteLength: number } | |||
// Calculate if the part size is more than what's left over, and in that case use left over bytes for byteLength | |||
stream.byteLength = min([ statResult.size - start, partSize ]) | |||
const uploadPartCommand = new UploadPartCommand({ | |||
Bucket: bucketInfo.BUCKET_NAME, | |||
Key: key, | |||
UploadId: createResponse.UploadId, | |||
PartNumber: partNumber, | |||
Body: stream | |||
}) | |||
const uploadResponse = await s3Client.send(uploadPartCommand) | |||
parts.push({ ETag: uploadResponse.ETag, PartNumber: partNumber }) | |||
partNumber += 1 | |||
} | |||
await close(fd) | |||
const completeUploadCommand = new CompleteMultipartUploadCommand({ | |||
Bucket: bucketInfo.BUCKET_NAME, | |||
Key: objectStorageKey, | |||
UploadId: createResponse.UploadId, | |||
MultipartUpload: { Parts: parts } | |||
}) | |||
await s3Client.send(completeUploadCommand) | |||
logger.debug( | |||
'Completed %s%s in bucket %s in %d parts', | |||
bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, partNumber - 1, lTags() | |||
) | |||
return getPrivateUrl(bucketInfo, objectStorageKey) | |||
} |
@@ -0,0 +1,40 @@ | |||
import { CONFIG } from '@server/initializers/config' | |||
import { BucketInfo, buildKey, getEndpointParsed } from './shared' | |||
function getPrivateUrl (config: BucketInfo, keyWithoutPrefix: string) { | |||
return getBaseUrl(config) + buildKey(keyWithoutPrefix, config) | |||
} | |||
function getWebTorrentPublicFileUrl (fileUrl: string) { | |||