Browse Source

Fix S3 live sync

Ensure TS chunks referenced in M3U8 playlist are already uploaded on S3
pull/5340/merge
Chocobozzz 2 weeks ago
parent
commit
34023e1253
No known key found for this signature in database GPG Key ID: 583A612D890159BE
4 changed files with 72 additions and 30 deletions
  1. +45
    -29
      server/lib/live/shared/muxing-session.ts
  2. +16
    -1
      server/lib/object-storage/shared/object-storage-helpers.ts
  3. +10
    -0
      server/lib/object-storage/videos.ts
  4. +1
    -0
      server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts

+ 45
- 29
server/lib/live/shared/muxing-session.ts View File

@@ -8,7 +8,12 @@ import { computeOutputFPS } from '@server/helpers/ffmpeg'
import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
import { CONFIG } from '@server/initializers/config'
import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants'
import { removeHLSFileObjectStorageByPath, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage'
import {
removeHLSFileObjectStorageByPath,
storeHLSFileFromContent,
storeHLSFileFromFilename,
storeHLSFileFromPath
} from '@server/lib/object-storage'
import { VideoFileModel } from '@server/models/video/video-file'
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
@@ -74,6 +79,9 @@ class MuxingSession extends EventEmitter {

private readonly lTags: LoggerTagsFn

// Path -> Queue
private readonly objectStorageSendQueues = new Map<string, PQueue>()

private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}

private streamingPlaylist: MStreamingPlaylistVideo
@@ -151,7 +159,6 @@ class MuxingSession extends EventEmitter {

this.watchMasterFile()
this.watchTSFiles()
this.watchM3U8File()
}

abort () {
@@ -192,41 +199,15 @@ class MuxingSession extends EventEmitter {
})
}

private watchM3U8File () {
const sendQueues = new Map<string, PQueue>()

const onChange = async (m3u8Path: string) => {
if (m3u8Path.endsWith('.m3u8') !== true) return
if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return

logger.debug('Live change handler of M3U8 file %s.', m3u8Path, this.lTags())

try {
if (!sendQueues.has(m3u8Path)) {
sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
}

const queue = sendQueues.get(m3u8Path)
await queue.add(() => storeHLSFileFromPath(this.streamingPlaylist, m3u8Path))
} catch (err) {
logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
}
}

this.filesWatcher.on('change', onChange)
}

private watchTSFiles () {
const startStreamDateTime = new Date().getTime()

const playlistIdMatcher = /^([\d+])-/

const addHandler = async (segmentPath: string) => {
if (segmentPath.endsWith('.ts') !== true) return

logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags())

const playlistId = basename(segmentPath).match(playlistIdMatcher)[0]
const playlistId = this.getPlaylistIdFromTS(segmentPath)

const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || []
this.processSegments(segmentsToProcess)
@@ -349,6 +330,8 @@ class MuxingSession extends EventEmitter {
if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
try {
await storeHLSFileFromPath(this.streamingPlaylist, segmentPath)

await this.processM3U8ToObjectStorage(segmentPath)
} catch (err) {
logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() })
}
@@ -362,6 +345,29 @@ class MuxingSession extends EventEmitter {
}
}

private async processM3U8ToObjectStorage (segmentPath: string) {
const m3u8Path = join(this.outDirectory, this.getPlaylistNameFromTS(segmentPath))

logger.debug('Process M3U8 file %s.', m3u8Path, this.lTags())

const segmentName = basename(segmentPath)

const playlistContent = await readFile(m3u8Path, 'utf-8')
// Remove new chunk references, that will be processed later
const filteredPlaylistContent = playlistContent.substring(0, playlistContent.lastIndexOf(segmentName) + segmentName.length) + '\n'

try {
if (!this.objectStorageSendQueues.has(m3u8Path)) {
this.objectStorageSendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
}

const queue = this.objectStorageSendQueues.get(m3u8Path)
await queue.add(() => storeHLSFileFromContent(this.streamingPlaylist, m3u8Path, filteredPlaylistContent))
} catch (err) {
logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
}
}

private onTranscodingError () {
this.emit('transcoding-error', ({ videoUUID: this.videoUUID }))
}
@@ -484,6 +490,16 @@ class MuxingSession extends EventEmitter {
? new RemoteTranscodingWrapper(options)
: new FFmpegTranscodingWrapper(options)
}

private getPlaylistIdFromTS (segmentPath: string) {
const playlistIdMatcher = /^([\d+])-/

return basename(segmentPath).match(playlistIdMatcher)[1]
}

private getPlaylistNameFromTS (segmentPath: string) {
return `${this.getPlaylistIdFromTS(segmentPath)}.m3u8`
}
}

// ---------------------------------------------------------------------------


+ 16
- 1
server/lib/object-storage/shared/object-storage-helpers.ts View File

@@ -59,6 +59,20 @@ async function storeObject (options: {
return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate })
}

async function storeContent (options: {
content: string
inputPath: string
objectStorageKey: string
bucketInfo: BucketInfo
isPrivate: boolean
}): Promise<string> {
const { content, objectStorageKey, bucketInfo, inputPath, isPrivate } = options

logger.debug('Uploading %s content to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())

return uploadToStorage({ objectStorageKey, content, bucketInfo, isPrivate })
}

// ---------------------------------------------------------------------------

async function updateObjectACL (options: {
@@ -206,6 +220,7 @@ export {
buildKey,

storeObject,
storeContent,

removeObject,
removeObjectByFullKey,
@@ -223,7 +238,7 @@ export {
// ---------------------------------------------------------------------------

async function uploadToStorage (options: {
content: ReadStream
content: ReadStream | string
objectStorageKey: string
bucketInfo: BucketInfo
isPrivate: boolean


+ 10
- 0
server/lib/object-storage/videos.ts View File

@@ -42,6 +42,15 @@ function storeHLSFileFromPath (playlist: MStreamingPlaylistVideo, path: string)
})
}

function storeHLSFileFromContent (playlist: MStreamingPlaylistVideo, path: string, content: string) {
return storeObject({
inputPath: path,
objectStorageKey: generateHLSObjectStorageKey(playlist, basename(path)),
bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS,
isPrivate: playlist.Video.hasPrivateStaticPath()
})
}

// ---------------------------------------------------------------------------

function storeWebTorrentFile (video: MVideo, file: MVideoFile) {
@@ -166,6 +175,7 @@ export {
storeWebTorrentFile,
storeHLSFileFromFilename,
storeHLSFileFromPath,
storeHLSFileFromContent,

updateWebTorrentFileACL,
updateHLSFilesACL,


+ 1
- 0
server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts View File

@@ -80,6 +80,7 @@ export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateO
const outputDirectory = privatePayload.outputDirectory
const videoUUID = privatePayload.videoUUID

// Always process the chunk first before moving m3u8 that references this chunk
if (updatePayload.type === 'add-chunk') {
await move(
updatePayload.videoChunkFile as string,


Loading…
Cancel
Save