@@ -32,6 +32,7 @@ export class JobsComponent extends RestTable implements OnInit { | |||
'video-import', | |||
'videos-views', | |||
'activitypub-refresher', | |||
'video-live-ending', | |||
'video-redundancy' | |||
] | |||
@@ -86,7 +86,7 @@ export class MyAccountNotificationPreferencesComponent implements OnInit { | |||
} | |||
private savePreferencesImpl () { | |||
this.userNotificationService.updateNotificationSettings(this.user, this.user.notificationSettings) | |||
this.userNotificationService.updateNotificationSettings(this.user.notificationSettings) | |||
.subscribe( | |||
() => { | |||
this.notifier.success($localize`Preferences saved`, undefined, 2000) | |||
@@ -195,7 +195,7 @@ | |||
</ng-template> | |||
</ng-container> | |||
<ng-container ngbNavItem *ngIf="videoLive"> | |||
<ng-container ngbNavItem *ngIf="liveVideo"> | |||
<a ngbNavLink i18n>Live settings</a> | |||
<ng-template ngbNavContent> | |||
@@ -203,13 +203,13 @@ | |||
<div class="col-md-12"> | |||
<div class="form-group"> | |||
<label for="videoLiveRTMPUrl" i18n>Live RTMP Url</label> | |||
<my-input-readonly-copy id="videoLiveRTMPUrl" [value]="videoLive.rtmpUrl"></my-input-readonly-copy> | |||
<label for="liveVideoRTMPUrl" i18n>Live RTMP Url</label> | |||
<my-input-readonly-copy id="liveVideoRTMPUrl" [value]="liveVideo.rtmpUrl"></my-input-readonly-copy> | |||
</div> | |||
<div class="form-group"> | |||
<label for="videoLiveStreamKey" i18n>Live stream key</label> | |||
<my-input-readonly-copy id="videoLiveStreamKey" [value]="videoLive.streamKey"></my-input-readonly-copy> | |||
<label for="liveVideoStreamKey" i18n>Live stream key</label> | |||
<my-input-readonly-copy id="liveVideoStreamKey" [value]="liveVideo.streamKey"></my-input-readonly-copy> | |||
</div> | |||
</div> | |||
</div> | |||
@@ -20,7 +20,7 @@ import { | |||
import { FormReactiveValidationMessages, FormValidatorService, SelectChannelItem } from '@app/shared/shared-forms' | |||
import { InstanceService } from '@app/shared/shared-instance' | |||
import { VideoCaptionEdit, VideoEdit, VideoService } from '@app/shared/shared-main' | |||
import { ServerConfig, VideoConstant, VideoLive, VideoPrivacy } from '@shared/models' | |||
import { ServerConfig, VideoConstant, LiveVideo, VideoPrivacy } from '@shared/models' | |||
import { RegisterClientFormFieldOptions, RegisterClientVideoFieldOptions } from '@shared/models/plugins/register-client-form-field.model' | |||
import { I18nPrimengCalendarService } from './i18n-primeng-calendar.service' | |||
import { VideoCaptionAddModalComponent } from './video-caption-add-modal.component' | |||
@@ -42,7 +42,7 @@ export class VideoEditComponent implements OnInit, OnDestroy { | |||
@Input() videoCaptions: (VideoCaptionEdit & { captionPath?: string })[] = [] | |||
@Input() waitTranscodingEnabled = true | |||
@Input() type: VideoEditType | |||
@Input() videoLive: VideoLive | |||
@Input() liveVideo: LiveVideo | |||
@ViewChild('videoCaptionAddModal', { static: true }) videoCaptionAddModal: VideoCaptionAddModalComponent | |||
@@ -31,7 +31,7 @@ | |||
<form [hidden]="!isInUpdateForm" novalidate [formGroup]="form"> | |||
<my-video-edit | |||
[form]="form" [formErrors]="formErrors" [videoCaptions]="videoCaptions" [schedulePublicationPossible]="false" | |||
[validationMessages]="validationMessages" [userVideoChannels]="userVideoChannels" [videoLive]="videoLive" | |||
[validationMessages]="validationMessages" [userVideoChannels]="userVideoChannels" [liveVideo]="liveVideo" | |||
type="go-live" | |||
></my-video-edit> | |||
@@ -4,9 +4,9 @@ import { Router } from '@angular/router' | |||
import { AuthService, CanComponentDeactivate, Notifier, ServerService } from '@app/core' | |||
import { scrollToTop } from '@app/helpers' | |||
import { FormValidatorService } from '@app/shared/shared-forms' | |||
import { VideoCaptionService, VideoEdit, VideoService, VideoLiveService } from '@app/shared/shared-main' | |||
import { LiveVideoService, VideoCaptionService, VideoEdit, VideoService } from '@app/shared/shared-main' | |||
import { LoadingBarService } from '@ngx-loading-bar/core' | |||
import { VideoCreate, VideoLive, VideoPrivacy } from '@shared/models' | |||
import { LiveVideo, VideoCreate, VideoPrivacy } from '@shared/models' | |||
import { VideoSend } from './video-send' | |||
@Component({ | |||
@@ -23,7 +23,7 @@ export class VideoGoLiveComponent extends VideoSend implements OnInit, CanCompon | |||
isInUpdateForm = false | |||
videoLive: VideoLive | |||
liveVideo: LiveVideo | |||
videoId: number | |||
videoUUID: string | |||
error: string | |||
@@ -38,7 +38,7 @@ export class VideoGoLiveComponent extends VideoSend implements OnInit, CanCompon | |||
protected serverService: ServerService, | |||
protected videoService: VideoService, | |||
protected videoCaptionService: VideoCaptionService, | |||
private videoLiveService: VideoLiveService, | |||
private liveVideoService: LiveVideoService, | |||
private router: Router | |||
) { | |||
super() | |||
@@ -69,7 +69,7 @@ export class VideoGoLiveComponent extends VideoSend implements OnInit, CanCompon | |||
const toPatch = Object.assign({}, video, { privacy: this.firstStepPrivacyId }) | |||
this.form.patchValue(toPatch) | |||
this.videoLiveService.goLive(video).subscribe( | |||
this.liveVideoService.goLive(video).subscribe( | |||
res => { | |||
this.videoId = res.video.id | |||
this.videoUUID = res.video.uuid | |||
@@ -114,10 +114,10 @@ export class VideoGoLiveComponent extends VideoSend implements OnInit, CanCompon | |||
} | |||
private fetchVideoLive () { | |||
this.videoLiveService.getVideoLive(this.videoId) | |||
this.liveVideoService.getVideoLive(this.videoId) | |||
.subscribe( | |||
videoLive => { | |||
this.videoLive = videoLive | |||
liveVideo => { | |||
this.liveVideo = liveVideo | |||
}, | |||
err => { | |||
@@ -11,7 +11,7 @@ | |||
[validationMessages]="validationMessages" [userVideoChannels]="userVideoChannels" | |||
[videoCaptions]="videoCaptions" [waitTranscodingEnabled]="waitTranscodingEnabled" | |||
type="update" (pluginFieldsAdded)="hydratePluginFieldsFromVideo()" | |||
[videoLive]="videoLive" | |||
[liveVideo]="liveVideo" | |||
></my-video-edit> | |||
<div class="submit-container"> | |||
@@ -5,7 +5,7 @@ import { Notifier } from '@app/core' | |||
import { FormReactive, FormValidatorService, SelectChannelItem } from '@app/shared/shared-forms' | |||
import { VideoCaptionEdit, VideoCaptionService, VideoDetails, VideoEdit, VideoService } from '@app/shared/shared-main' | |||
import { LoadingBarService } from '@ngx-loading-bar/core' | |||
import { VideoPrivacy, VideoLive } from '@shared/models' | |||
import { LiveVideo, VideoPrivacy } from '@shared/models' | |||
import { hydrateFormFromVideo } from './shared/video-edit-utils' | |||
@Component({ | |||
@@ -17,7 +17,7 @@ export class VideoUpdateComponent extends FormReactive implements OnInit { | |||
video: VideoEdit | |||
userVideoChannels: SelectChannelItem[] = [] | |||
videoCaptions: VideoCaptionEdit[] = [] | |||
videoLive: VideoLive | |||
liveVideo: LiveVideo | |||
isUpdatingVideo = false | |||
schedulePublicationPossible = false | |||
@@ -42,11 +42,11 @@ export class VideoUpdateComponent extends FormReactive implements OnInit { | |||
this.route.data | |||
.pipe(map(data => data.videoData)) | |||
.subscribe(({ video, videoChannels, videoCaptions, videoLive }) => { | |||
.subscribe(({ video, videoChannels, videoCaptions, liveVideo }) => { | |||
this.video = new VideoEdit(video) | |||
this.userVideoChannels = videoChannels | |||
this.videoCaptions = videoCaptions | |||
this.videoLive = videoLive | |||
this.liveVideo = liveVideo | |||
this.schedulePublicationPossible = this.video.privacy === VideoPrivacy.PRIVATE | |||
@@ -2,13 +2,13 @@ import { forkJoin, of } from 'rxjs' | |||
import { map, switchMap } from 'rxjs/operators' | |||
import { Injectable } from '@angular/core' | |||
import { ActivatedRouteSnapshot, Resolve } from '@angular/router' | |||
import { VideoCaptionService, VideoChannelService, VideoDetails, VideoLiveService, VideoService } from '@app/shared/shared-main' | |||
import { VideoCaptionService, VideoChannelService, VideoDetails, LiveVideoService, VideoService } from '@app/shared/shared-main' | |||
@Injectable() | |||
export class VideoUpdateResolver implements Resolve<any> { | |||
constructor ( | |||
private videoService: VideoService, | |||
private videoLiveService: VideoLiveService, | |||
private liveVideoService: LiveVideoService, | |||
private videoChannelService: VideoChannelService, | |||
private videoCaptionService: VideoCaptionService | |||
) { | |||
@@ -49,7 +49,7 @@ export class VideoUpdateResolver implements Resolve<any> { | |||
), | |||
video.isLive | |||
? this.videoLiveService.getVideoLive(video.id) | |||
? this.liveVideoService.getVideoLive(video.id) | |||
: of(undefined) | |||
] | |||
} | |||
@@ -29,6 +29,14 @@ | |||
This video will be published on {{ video.scheduledUpdate.updateAt | date: 'full' }}. | |||
</div> | |||
<div i18n class="col-md-12 alert alert-info" *ngIf="isWaitingForLive()"> | |||
This live has not started yet. | |||
</div> | |||
<div i18n class="col-md-12 alert alert-info" *ngIf="isLiveEnded()"> | |||
This live is finished. | |||
</div> | |||
<div class="col-md-12 alert alert-danger" *ngIf="video?.blacklisted"> | |||
<div class="blocked-label" i18n>This video is blocked.</div> | |||
{{ video.blockedReason }} | |||
@@ -113,7 +121,7 @@ | |||
</div> | |||
</div> | |||
<ng-container *ngIf="!isUserLoggedIn()"> | |||
<ng-container *ngIf="!isUserLoggedIn() && !isLive()"> | |||
<button | |||
*ngIf="isVideoDownloadable()" class="action-button action-button-save" | |||
(click)="showDownloadModal()" (keydown.enter)="showDownloadModal()" | |||
@@ -50,6 +50,8 @@ $video-info-margin-left: 44px; | |||
} | |||
#video-wrapper { | |||
$video-height: 66vh; | |||
background-color: #000; | |||
display: flex; | |||
justify-content: center; | |||
@@ -58,6 +60,7 @@ $video-info-margin-left: 44px; | |||
display: flex; | |||
justify-content: center; | |||
flex-grow: 1; | |||
height: $video-height; | |||
} | |||
.remote-server-down { | |||
@@ -84,7 +87,7 @@ $video-info-margin-left: 44px; | |||
::ng-deep .video-js { | |||
width: 100%; | |||
max-width: getPlayerWidth(66vh); | |||
height: 66vh; | |||
height: $video-height; | |||
// VideoJS create an inner video player | |||
video { | |||
@@ -4,7 +4,17 @@ import { catchError } from 'rxjs/operators' | |||
import { PlatformLocation } from '@angular/common' | |||
import { ChangeDetectorRef, Component, ElementRef, Inject, LOCALE_ID, NgZone, OnDestroy, OnInit, ViewChild } from '@angular/core' | |||
import { ActivatedRoute, Router } from '@angular/router' | |||
import { AuthService, AuthUser, ConfirmService, MarkdownService, Notifier, RestExtractor, ServerService, UserService } from '@app/core' | |||
import { | |||
AuthService, | |||
AuthUser, | |||
ConfirmService, | |||
MarkdownService, | |||
Notifier, | |||
PeerTubeSocket, | |||
RestExtractor, | |||
ServerService, | |||
UserService | |||
} from '@app/core' | |||
import { HooksService } from '@app/core/plugins/hooks.service' | |||
import { RedirectService } from '@app/core/routing/redirect.service' | |||
import { isXPercentInViewport, scrollToTop } from '@app/helpers' | |||
@@ -30,6 +40,8 @@ import { environment } from '../../../environments/environment' | |||
import { VideoSupportComponent } from './modal/video-support.component' | |||
import { VideoWatchPlaylistComponent } from './video-watch-playlist.component' | |||
type URLOptions = CustomizationOptions & { playerMode: PlayerMode } | |||
@Component({ | |||
selector: 'my-video-watch', | |||
templateUrl: './video-watch.component.html', | |||
@@ -76,6 +88,7 @@ export class VideoWatchComponent implements OnInit, OnDestroy { | |||
private paramsSub: Subscription | |||
private queryParamsSub: Subscription | |||
private configSub: Subscription | |||
private liveVideosSub: Subscription | |||
private serverConfig: ServerConfig | |||
@@ -99,6 +112,7 @@ export class VideoWatchComponent implements OnInit, OnDestroy { | |||
private videoCaptionService: VideoCaptionService, | |||
private hotkeysService: HotkeysService, | |||
private hooks: HooksService, | |||
private peertubeSocket: PeerTubeSocket, | |||
private location: PlatformLocation, | |||
@Inject(LOCALE_ID) private localeId: string | |||
) { | |||
@@ -165,6 +179,7 @@ export class VideoWatchComponent implements OnInit, OnDestroy { | |||
if (this.paramsSub) this.paramsSub.unsubscribe() | |||
if (this.queryParamsSub) this.queryParamsSub.unsubscribe() | |||
if (this.configSub) this.configSub.unsubscribe() | |||
if (this.liveVideosSub) this.liveVideosSub.unsubscribe() | |||
// Unbind hotkeys | |||
this.hotkeysService.remove(this.hotkeys) | |||
@@ -306,6 +321,18 @@ export class VideoWatchComponent implements OnInit, OnDestroy { | |||
return this.video && this.video.scheduledUpdate !== undefined | |||
} | |||
isLive () { | |||
return !!(this.video?.isLive) | |||
} | |||
isWaitingForLive () { | |||
return this.video?.state.id === VideoState.WAITING_FOR_LIVE | |||
} | |||
isLiveEnded () { | |||
return this.video?.state.id === VideoState.LIVE_ENDED | |||
} | |||
isVideoBlur (video: Video) { | |||
return video.isVideoNSFWForUser(this.user, this.serverConfig) | |||
} | |||
@@ -470,8 +497,10 @@ export class VideoWatchComponent implements OnInit, OnDestroy { | |||
private async onVideoFetched ( | |||
video: VideoDetails, | |||
videoCaptions: VideoCaption[], | |||
urlOptions: CustomizationOptions & { playerMode: PlayerMode } | |||
urlOptions: URLOptions | |||
) { | |||
this.subscribeToLiveEventsIfNeeded(this.video, video) | |||
this.video = video | |||
this.videoCaptions = videoCaptions | |||
@@ -489,6 +518,9 @@ export class VideoWatchComponent implements OnInit, OnDestroy { | |||
if (res === false) return this.location.back() | |||
} | |||
const videoState = this.video.state.id | |||
if (videoState === VideoState.LIVE_ENDED || videoState === VideoState.WAITING_FOR_LIVE) return | |||
// Flush old player if needed | |||
this.flushPlayer() | |||
@@ -794,6 +826,29 @@ export class VideoWatchComponent implements OnInit, OnDestroy { | |||
return !this.player.paused() | |||
} | |||
private async subscribeToLiveEventsIfNeeded (oldVideo: VideoDetails, newVideo: VideoDetails) { | |||
if (!this.liveVideosSub) { | |||
this.liveVideosSub = this.peertubeSocket.getLiveVideosObservable() | |||
.subscribe(({ payload }) => { | |||
if (payload.state !== VideoState.PUBLISHED || this.video.state.id !== VideoState.WAITING_FOR_LIVE) return | |||
const videoUUID = this.video.uuid | |||
// Reset to refetch the video | |||
this.video = undefined | |||
this.loadVideo(videoUUID) | |||
}) | |||
} | |||
if (oldVideo && oldVideo.id !== newVideo.id) { | |||
await this.peertubeSocket.unsubscribeLiveVideos(oldVideo.id) | |||
} | |||
if (!newVideo.isLive) return | |||
await this.peertubeSocket.subscribeToLiveVideosSocket(newVideo.id) | |||
} | |||
private initHotkeys () { | |||
this.hotkeys = [ | |||
// These hotkeys are managed by the player | |||
@@ -4,7 +4,7 @@ import { ToastModule } from 'primeng/toast' | |||
import { CommonModule } from '@angular/common' | |||
import { NgModule, Optional, SkipSelf } from '@angular/core' | |||
import { BrowserAnimationsModule } from '@angular/platform-browser/animations' | |||
import { UserNotificationSocket } from '@app/core/notification/user-notification-socket.service' | |||
import { PeerTubeSocket } from '@app/core/notification/peertube-socket.service' | |||
import { HooksService } from '@app/core/plugins/hooks.service' | |||
import { PluginService } from '@app/core/plugins/plugin.service' | |||
import { UnloggedGuard } from '@app/core/routing/unlogged-guard.service' | |||
@@ -84,7 +84,7 @@ import { LocalStorageService, ScreenService, SessionStorageService } from './wra | |||
RedirectService, | |||
Notifier, | |||
MessageService, | |||
UserNotificationSocket, | |||
PeerTubeSocket, | |||
ServerConfigResolver, | |||
CanDeactivateGuard | |||
] | |||
@@ -1,2 +1,2 @@ | |||
export * from './notifier.service' | |||
export * from './user-notification-socket.service' | |||
export * from './peertube-socket.service' |
@@ -0,0 +1,86 @@ | |||
import { Subject } from 'rxjs' | |||
import { Injectable, NgZone } from '@angular/core' | |||
import { LiveVideoEventPayload, LiveVideoEventType, UserNotification as UserNotificationServer } from '@shared/models' | |||
import { environment } from '../../../environments/environment' | |||
import { AuthService } from '../auth' | |||
export type NotificationEvent = 'new' | 'read' | 'read-all' | |||
@Injectable() | |||
export class PeerTubeSocket { | |||
private io: typeof import ('socket.io-client') | |||
private notificationSubject = new Subject<{ type: NotificationEvent, notification?: UserNotificationServer }>() | |||
private liveVideosSubject = new Subject<{ type: LiveVideoEventType, payload: LiveVideoEventPayload }>() | |||
private notificationSocket: SocketIOClient.Socket | |||
private liveVideosSocket: SocketIOClient.Socket | |||
constructor ( | |||
private auth: AuthService, | |||
private ngZone: NgZone | |||
) {} | |||
async getMyNotificationsSocket () { | |||
await this.initNotificationSocket() | |||
return this.notificationSubject.asObservable() | |||
} | |||
getLiveVideosObservable () { | |||
return this.liveVideosSubject.asObservable() | |||
} | |||
async subscribeToLiveVideosSocket (videoId: number) { | |||
await this.initLiveVideosSocket() | |||
this.liveVideosSocket.emit('subscribe', { videoId }) | |||
} | |||
async unsubscribeLiveVideos (videoId: number) { | |||
if (!this.liveVideosSocket) return | |||
this.liveVideosSocket.emit('unsubscribe', { videoId }) | |||
} | |||
dispatchNotificationEvent (type: NotificationEvent, notification?: UserNotificationServer) { | |||
this.notificationSubject.next({ type, notification }) | |||
} | |||
private async initNotificationSocket () { | |||
if (this.notificationSocket) return | |||
await this.importIOIfNeeded() | |||
this.ngZone.runOutsideAngular(() => { | |||
this.notificationSocket = this.io(environment.apiUrl + '/user-notifications', { | |||
query: { accessToken: this.auth.getAccessToken() } | |||
}) | |||
this.notificationSocket.on('new-notification', (n: UserNotificationServer) => this.dispatchNotificationEvent('new', n)) | |||
}) | |||
} | |||
private async initLiveVideosSocket () { | |||
if (this.liveVideosSocket) return | |||
await this.importIOIfNeeded() | |||
this.ngZone.runOutsideAngular(() => { | |||
this.liveVideosSocket = this.io(environment.apiUrl + '/live-videos') | |||
const type: LiveVideoEventType = 'state-change' | |||
this.liveVideosSocket.on(type, (payload: LiveVideoEventPayload) => this.dispatchLiveVideoEvent(type, payload)) | |||
}) | |||
} | |||
private async importIOIfNeeded () { | |||
if (this.io) return | |||
this.io = (await import('socket.io-client') as any).default | |||
} | |||
private dispatchLiveVideoEvent (type: LiveVideoEventType, payload: LiveVideoEventPayload) { | |||
this.liveVideosSubject.next({ type, payload }) | |||
} | |||
} |
@@ -1,44 +0,0 @@ | |||
import { Subject } from 'rxjs' | |||
import { Injectable, NgZone } from '@angular/core' | |||
import { UserNotification as UserNotificationServer } from '@shared/models' | |||
import { environment } from '../../../environments/environment' | |||
import { AuthService } from '../auth' | |||
export type NotificationEvent = 'new' | 'read' | 'read-all' | |||
@Injectable() | |||
export class UserNotificationSocket { | |||
private notificationSubject = new Subject<{ type: NotificationEvent, notification?: UserNotificationServer }>() | |||
private socket: SocketIOClient.Socket | |||
constructor ( | |||
private auth: AuthService, | |||
private ngZone: NgZone | |||
) {} | |||
dispatch (type: NotificationEvent, notification?: UserNotificationServer) { | |||
this.notificationSubject.next({ type, notification }) | |||
} | |||
async getMyNotificationsSocket () { | |||
await this.initSocket() | |||
return this.notificationSubject.asObservable() | |||
} | |||
private async initSocket () { | |||
if (this.socket) return | |||
// FIXME: import('..') returns a struct module, containing a "default" field corresponding to our sanitizeHtml function | |||
const io: typeof import ('socket.io-client') = (await import('socket.io-client') as any).default | |||
this.ngZone.runOutsideAngular(() => { | |||
this.socket = io(environment.apiUrl + '/user-notifications', { | |||
query: { accessToken: this.auth.getAccessToken() } | |||
}) | |||
this.socket.on('new-notification', (n: UserNotificationServer) => this.dispatch('new', n)) | |||
}) | |||
} | |||
} |
@@ -2,7 +2,7 @@ import { Subject, Subscription } from 'rxjs' | |||
import { filter } from 'rxjs/operators' | |||
import { Component, EventEmitter, Input, Output, OnDestroy, OnInit, ViewChild } from '@angular/core' | |||
import { NavigationEnd, Router } from '@angular/router' | |||
import { Notifier, User, UserNotificationSocket } from '@app/core' | |||
import { Notifier, User, PeerTubeSocket } from '@app/core' | |||
import { UserNotificationService } from '@app/shared/shared-main' | |||
import { NgbPopover } from '@ng-bootstrap/ng-bootstrap' | |||
@@ -27,7 +27,7 @@ export class AvatarNotificationComponent implements OnInit, OnDestroy { | |||
constructor ( | |||
private userNotificationService: UserNotificationService, | |||
private userNotificationSocket: UserNotificationSocket, | |||
private peertubeSocket: PeerTubeSocket, | |||
private notifier: Notifier, | |||
private router: Router | |||
) { | |||
@@ -75,7 +75,7 @@ export class AvatarNotificationComponent implements OnInit, OnDestroy { | |||
} | |||
private async subscribeToNotifications () { | |||
const obs = await this.userNotificationSocket.getMyNotificationsSocket() | |||
const obs = await this.peertubeSocket.getMyNotificationsSocket() | |||
this.notificationSub = obs.subscribe(data => { | |||
if (data.type === 'new') return this.unreadNotifications++ | |||
@@ -23,7 +23,7 @@ import { FeedComponent } from './feeds' | |||
import { LoaderComponent, SmallLoaderComponent } from './loaders' | |||
import { HelpComponent, ListOverflowComponent, TopMenuDropdownComponent } from './misc' | |||
import { UserHistoryService, UserNotificationsComponent, UserNotificationService, UserQuotaComponent } from './users' | |||
import { RedundancyService, VideoImportService, VideoOwnershipService, VideoService, VideoLiveService } from './video' | |||
import { RedundancyService, VideoImportService, VideoOwnershipService, VideoService, LiveVideoService } from './video' | |||
import { VideoCaptionService } from './video-caption' | |||
import { VideoChannelService } from './video-channel' | |||
@@ -142,7 +142,7 @@ import { VideoChannelService } from './video-channel' | |||
RedundancyService, | |||
VideoImportService, | |||
VideoOwnershipService, | |||
VideoLiveService, | |||
LiveVideoService, | |||
VideoService, | |||
VideoCaptionService, | |||
@@ -1,7 +1,7 @@ | |||
import { catchError, map, tap } from 'rxjs/operators' | |||
import { HttpClient, HttpParams } from '@angular/common/http' | |||
import { Injectable } from '@angular/core' | |||
import { ComponentPaginationLight, RestExtractor, RestService, User, UserNotificationSocket, AuthService } from '@app/core' | |||
import { ComponentPaginationLight, RestExtractor, RestService, User, PeerTubeSocket, AuthService } from '@app/core' | |||
import { ResultList, UserNotification as UserNotificationServer, UserNotificationSetting } from '@shared/models' | |||
import { environment } from '../../../../environments/environment' | |||
import { UserNotification } from './user-notification.model' | |||
@@ -17,7 +17,7 @@ export class UserNotificationService { | |||
private auth: AuthService, | |||
private restExtractor: RestExtractor, | |||
private restService: RestService, | |||
private userNotificationSocket: UserNotificationSocket | |||
private peertubeSocket: PeerTubeSocket | |||
) {} | |||
listMyNotifications (parameters: { | |||
@@ -57,7 +57,7 @@ export class UserNotificationService { | |||
return this.authHttp.post(url, body, { headers }) | |||
.pipe( | |||
map(this.restExtractor.extractDataBool), | |||
tap(() => this.userNotificationSocket.dispatch('read')), | |||
tap(() => this.peertubeSocket.dispatchNotificationEvent('read')), | |||
catchError(res => this.restExtractor.handleError(res)) | |||
) | |||
} | |||
@@ -69,12 +69,12 @@ export class UserNotificationService { | |||
return this.authHttp.post(url, {}, { headers }) | |||
.pipe( | |||
map(this.restExtractor.extractDataBool), | |||
tap(() => this.userNotificationSocket.dispatch('read-all')), | |||
tap(() => this.peertubeSocket.dispatchNotificationEvent('read-all')), | |||
catchError(res => this.restExtractor.handleError(res)) | |||
) | |||
} | |||
updateNotificationSettings (user: User, settings: UserNotificationSetting) { | |||
updateNotificationSettings (settings: UserNotificationSetting) { | |||
const url = UserNotificationService.BASE_NOTIFICATION_SETTINGS | |||
return this.authHttp.put(url, settings) | |||
@@ -1,8 +1,8 @@ | |||
export * from './live-video.service' | |||
export * from './redundancy.service' | |||
export * from './video-details.model' | |||
export * from './video-edit.model' | |||
export * from './video-import.service' | |||
export * from './video-live.service' | |||
export * from './video-ownership.service' | |||
export * from './video.model' | |||
export * from './video.service' |
@@ -2,11 +2,11 @@ import { catchError } from 'rxjs/operators' | |||
import { HttpClient } from '@angular/common/http' | |||
import { Injectable } from '@angular/core' | |||
import { RestExtractor } from '@app/core' | |||
import { VideoCreate, VideoLive } from '@shared/models' | |||
import { VideoCreate, LiveVideo } from '@shared/models' | |||
import { environment } from '../../../../environments/environment' | |||
@Injectable() | |||
export class VideoLiveService { | |||
export class LiveVideoService { | |||
static BASE_VIDEO_LIVE_URL = environment.apiUrl + '/api/v1/videos/live/' | |||
constructor ( | |||
@@ -16,13 +16,13 @@ export class VideoLiveService { | |||
goLive (video: VideoCreate) { | |||
return this.authHttp | |||
.post<{ video: { id: number, uuid: string } }>(VideoLiveService.BASE_VIDEO_LIVE_URL, video) | |||
.post<{ video: { id: number, uuid: string } }>(LiveVideoService.BASE_VIDEO_LIVE_URL, video) | |||
.pipe(catchError(err => this.restExtractor.handleError(err))) | |||
} | |||
getVideoLive (videoId: number | string) { | |||
return this.authHttp | |||
.get<VideoLive>(VideoLiveService.BASE_VIDEO_LIVE_URL + videoId) | |||
.get<LiveVideo>(LiveVideoService.BASE_VIDEO_LIVE_URL + videoId) | |||
.pipe(catchError(err => this.restExtractor.handleError(err))) | |||
} | |||
} |
@@ -107,7 +107,7 @@ | |||
<div class="filters"> | |||
<div> | |||
<div class="form-group start-at"> | |||
<div class="form-group start-at" *ngIf="!video.isLive"> | |||
<my-peertube-checkbox | |||
inputName="startAt" [(ngModel)]="customizations.startAtCheckbox" | |||
i18n-labelText labelText="Start at" | |||
@@ -138,7 +138,7 @@ | |||
<div class="advanced-filters collapse-transition" [ngbCollapse]="isAdvancedCustomizationCollapsed"> | |||
<div> | |||
<div class="form-group stop-at"> | |||
<div class="form-group stop-at" *ngIf="!video.isLive"> | |||
<my-peertube-checkbox | |||
inputName="stopAt" [(ngModel)]="customizations.stopAtCheckbox" | |||
i18n-labelText labelText="Stop at" | |||
@@ -167,7 +167,7 @@ | |||
></my-peertube-checkbox> | |||
</div> | |||
<div class="form-group"> | |||
<div class="form-group" *ngIf="!video.isLive"> | |||
<my-peertube-checkbox | |||
inputName="loop" [(ngModel)]="customizations.loop" | |||
i18n-labelText labelText="Loop" | |||
@@ -146,7 +146,10 @@ export class VideoActionsDropdownComponent implements OnChanges { | |||
} | |||
isVideoDownloadable () { | |||
return this.video && this.video instanceof VideoDetails && this.video.downloadEnabled | |||
return this.video && | |||
this.video.isLive !== true && | |||
this.video instanceof VideoDetails && | |||
this.video.downloadEnabled | |||
} | |||
canVideoBeDuplicated () { | |||
@@ -246,9 +246,20 @@ transcoding: | |||
live: | |||
enabled: false | |||
# Limit lives duration | |||
# Set null to disable duration limit | |||
max_duration: 5 hours | |||
# Allow your users to save a replay of their live | |||
# PeerTube will transcode segments in a video file | |||
# If the user daily/total quota is reached, PeerTube will stop the live | |||
# /!\ transcoding.enabled (and not live.transcoding.enabled) has to be true to create a replay | |||
allow_replay: true | |||
rtmp: | |||
port: 1935 | |||
# Allow to transcode the live streaming in multiple live resolutions | |||
transcoding: | |||
enabled: false | |||
threads: 2 | |||
@@ -89,7 +89,7 @@ live: | |||
port: 1935 | |||
transcoding: | |||
enabled: false | |||
enabled: true | |||
threads: 2 | |||
resolutions: | |||
@@ -139,7 +139,8 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = { | |||
'email': 5, | |||
'videos-views': 1, | |||
'activitypub-refresher': 1, | |||
'video-redundancy': 1 | |||
'video-redundancy': 1, | |||
'video-live-ending': 1 | |||
} | |||
const JOB_CONCURRENCY: { [id in JobType]: number } = { | |||
'activitypub-http-broadcast': 1, | |||
@@ -152,7 +153,8 @@ const JOB_CONCURRENCY: { [id in JobType]: number } = { | |||
'email': 5, | |||
'videos-views': 1, | |||
'activitypub-refresher': 1, | |||
'video-redundancy': 1 | |||
'video-redundancy': 1, | |||
'video-live-ending': 1 | |||
} | |||
const JOB_TTL: { [id in JobType]: number } = { | |||
'activitypub-http-broadcast': 60000 * 10, // 10 minutes | |||
@@ -165,7 +167,8 @@ const JOB_TTL: { [id in JobType]: number } = { | |||
'email': 60000 * 10, // 10 minutes | |||
'videos-views': undefined, // Unlimited | |||
'activitypub-refresher': 60000 * 10, // 10 minutes | |||
'video-redundancy': 1000 * 3600 * 3 // 3 hours | |||
'video-redundancy': 1000 * 3600 * 3, // 3 hours | |||
'video-live-ending': 1000 * 60 * 10 // 10 minutes | |||
} | |||
const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = { | |||
'videos-views': { | |||
@@ -605,6 +608,7 @@ const HLS_REDUNDANCY_DIRECTORY = join(CONFIG.STORAGE.REDUNDANCY_DIR, 'hls') | |||
const VIDEO_LIVE = { | |||
EXTENSION: '.ts', | |||
CLEANUP_DELAY: 1000 * 60 * 5, // 5 mintues | |||
RTMP: { | |||
CHUNK_SIZE: 60000, | |||
GOP_CACHE: true, | |||
@@ -66,6 +66,7 @@ import { FilteredModelAttributes } from '../../types/sequelize' | |||
import { ActorFollowScoreCache } from '../files-cache' | |||
import { JobQueue } from '../job-queue' | |||
import { Notifier } from '../notifier' | |||
import { PeerTubeSocket } from '../peertube-socket' | |||
import { createPlaceholderThumbnail, createVideoMiniatureFromUrl } from '../thumbnail' | |||
import { setVideoTags } from '../video' | |||
import { autoBlacklistVideoIfNeeded } from '../video-blacklist' | |||
@@ -348,6 +349,7 @@ async function updateVideoFromAP (options: { | |||
video.privacy = videoData.privacy | |||
video.channelId = videoData.channelId | |||
video.views = videoData.views | |||
video.isLive = videoData.isLive | |||
const videoUpdated = await video.save(sequelizeOptions) as MVideoFullLight | |||
@@ -434,6 +436,7 @@ async function updateVideoFromAP (options: { | |||
}) | |||
if (wasPrivateVideo || wasUnlistedVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoUpdated) // Notify our users? | |||
if (videoUpdated.isLive) PeerTubeSocket.Instance.sendVideoLiveNewState(video) | |||
logger.info('Remote video with uuid %s updated', videoObject.uuid) | |||
@@ -0,0 +1,47 @@ | |||
import * as Bull from 'bull' | |||
import { readdir, remove } from 'fs-extra' | |||
import { join } from 'path' | |||
import { getHLSDirectory } from '@server/lib/video-paths' | |||
import { VideoModel } from '@server/models/video/video' | |||
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' | |||
import { VideoLiveEndingPayload } from '@shared/models' | |||
import { logger } from '../../../helpers/logger' | |||
async function processVideoLiveEnding (job: Bull.Job) { | |||
const payload = job.data as VideoLiveEndingPayload | |||
const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoId) | |||
if (!video) { | |||
logger.warn('Video live %d does not exist anymore. Cannot cleanup.', payload.videoId) | |||
return | |||
} | |||
const streamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) | |||
const hlsDirectory = getHLSDirectory(video, false) | |||
const files = await readdir(hlsDirectory) | |||
for (const filename of files) { | |||
if ( | |||
filename.endsWith('.ts') || | |||
filename.endsWith('.m3u8') || | |||
filename.endsWith('.mpd') || | |||
filename.endsWith('.m4s') || | |||
filename.endsWith('.tmp') | |||
) { | |||
const p = join(hlsDirectory, filename) | |||
remove(p) | |||
.catch(err => logger.error('Cannot remove %s.', p, { err })) | |||
} | |||
} | |||
streamingPlaylist.destroy() | |||
.catch(err => logger.error('Cannot remove live streaming playlist.', { err })) | |||
} | |||
// --------------------------------------------------------------------------- | |||
export { | |||
processVideoLiveEnding | |||
} |
@@ -10,6 +10,7 @@ import { | |||
RefreshPayload, | |||
VideoFileImportPayload, | |||
VideoImportPayload, | |||
VideoLiveEndingPayload, | |||
VideoRedundancyPayload, | |||
VideoTranscodingPayload | |||
} from '../../../shared/models' | |||
@@ -27,6 +28,7 @@ import { processVideosViews } from './handlers/video-views' | |||
import { refreshAPObject } from './handlers/activitypub-refresher' | |||
import { processVideoFileImport } from './handlers/video-file-import' | |||
import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' | |||
import { processVideoLiveEnding } from './handlers/video-live-ending' | |||
type CreateJobArgument = | |||
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | |||
@@ -39,8 +41,13 @@ type CreateJobArgument = | |||
{ type: 'video-import', payload: VideoImportPayload } | | |||
{ type: 'activitypub-refresher', payload: RefreshPayload } | | |||
{ type: 'videos-views', payload: {} } | | |||
{ type: 'video-live-ending', payload: VideoLiveEndingPayload } | | |||
{ type: 'video-redundancy', payload: VideoRedundancyPayload } | |||
type CreateJobOptions = { | |||
delay?: number | |||
} | |||
const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { | |||
'activitypub-http-broadcast': processActivityPubHttpBroadcast, | |||
'activitypub-http-unicast': processActivityPubHttpUnicast, | |||
@@ -52,6 +59,7 @@ const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { | |||
'video-import': processVideoImport, | |||
'videos-views': processVideosViews, | |||
'activitypub-refresher': refreshAPObject, | |||
'video-live-ending': processVideoLiveEnding, | |||
'video-redundancy': processVideoRedundancy | |||
} | |||
@@ -66,7 +74,8 @@ const jobTypes: JobType[] = [ | |||
'video-import', | |||
'videos-views', | |||
'activitypub-refresher', | |||
'video-redundancy' | |||
'video-redundancy', | |||
'video-live-ending' | |||
] | |||
class JobQueue { | |||
@@ -122,12 +131,12 @@ class JobQueue { | |||
} | |||
} | |||
createJob (obj: CreateJobArgument): void { | |||
this.createJobWithPromise(obj) | |||
createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void { | |||
this.createJobWithPromise(obj, options) | |||
.catch(err => logger.error('Cannot create job.', { err, obj })) | |||
} | |||
createJobWithPromise (obj: CreateJobArgument) { | |||
createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { | |||
const queue = this.queues[obj.type] | |||
if (queue === undefined) { | |||
logger.error('Unknown queue %s: cannot create job.', obj.type) | |||
@@ -137,7 +146,8 @@ class JobQueue { | |||
const jobArgs: Bull.JobOptions = { | |||
backoff: { delay: 60 * 1000, type: 'exponential' }, | |||
attempts: JOB_ATTEMPTS[obj.type], | |||
timeout: JOB_TTL[obj.type] | |||
timeout: JOB_TTL[obj.type], | |||
delay: options.delay | |||
} | |||
return queue.add(obj.payload, jobArgs) | |||
@@ -2,18 +2,22 @@ | |||
import { AsyncQueue, queue } from 'async' | |||
import * as chokidar from 'chokidar' | |||
import { FfmpegCommand } from 'fluent-ffmpeg' | |||
import { ensureDir, readdir, remove } from 'fs-extra' | |||
import { basename, join } from 'path' | |||
import { ensureDir } from 'fs-extra' | |||
import { basename } from 'path' | |||
import { computeResolutionsToTranscode, runLiveMuxing, runLiveTranscoding } from '@server/helpers/ffmpeg-utils' | |||
import { logger } from '@server/helpers/logger' | |||
import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' | |||
import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants' | |||
import { VideoModel } from '@server/models/video/video' | |||
import { VideoFileModel } from '@server/models/video/video-file' | |||
import { VideoLiveModel } from '@server/models/video/video-live' | |||
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' | |||
import { MStreamingPlaylist, MVideo, MVideoLiveVideo } from '@server/types/models' | |||
import { MStreamingPlaylist, MVideoLiveVideo } from '@server/types/models' | |||
import { VideoState, VideoStreamingPlaylistType } from '@shared/models' | |||
import { federateVideoIfNeeded } from './activitypub/videos' | |||
import { buildSha256Segment } from './hls' | |||
import { JobQueue } from './job-queue' | |||
import { PeerTubeSocket } from './peertube-socket' | |||
import { getHLSDirectory } from './video-paths' | |||
const NodeRtmpServer = require('node-media-server/node_rtmp_server') | |||
@@ -47,6 +51,7 @@ class LiveManager { | |||
private static instance: LiveManager | |||
private readonly transSessions = new Map<string, FfmpegCommand>() | |||
private readonly videoSessions = new Map<number, string>() | |||
private readonly segmentsSha256 = new Map<string, Map<string, string>>() | |||
private segmentsSha256Queue: AsyncQueue<SegmentSha256QueueParam> | |||
@@ -56,7 +61,8 @@ class LiveManager { | |||
} | |||
init () { | |||
this.getContext().nodeEvent.on('postPublish', (sessionId: string, streamPath: string) => { | |||
const events = this.getContext().nodeEvent | |||
events.on('postPublish', (sessionId: string, streamPath: string) => { | |||
logger.debug('RTMP received stream', { id: sessionId, streamPath }) | |||
const splittedPath = streamPath.split('/') | |||
@@ -69,7 +75,7 @@ class LiveManager { | |||
.catch(err => logger.error('Cannot handle sessions.', { err })) | |||
}) | |||
this.getContext().nodeEvent.on('donePublish', sessionId => { | |||
events.on('donePublish', sessionId => { | |||
this.abortSession(sessionId) | |||
}) | |||
@@ -115,6 +121,16 @@ class LiveManager { | |||
return this.segmentsSha256.get(videoUUID) | |||
} | |||
stopSessionOf (videoId: number) { | |||
const sessionId = this.videoSessions.get(videoId) | |||
if (!sessionId) return | |||
this.abortSession(sessionId) | |||
this.onEndTransmuxing(videoId) | |||
.catch(err => logger.error('Cannot end transmuxing of video %d.', videoId, { err })) | |||
} | |||
private getContext () { | |||
return context | |||
} | |||
@@ -135,6 +151,13 @@ class LiveManager { | |||
} | |||
const video = videoLive.Video | |||
if (video.isBlacklisted()) { | |||
logger.warn('Video is blacklisted. Refusing stream %s.', streamKey) | |||
return this.abortSession(sessionId) | |||
} | |||
this.videoSessions.set(video.id, sessionId) | |||
const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid) | |||
const session = this.getContext().sessions.get(sessionId) | |||
@@ -154,11 +177,6 @@ class LiveManager { | |||
type: VideoStreamingPlaylistType.HLS | |||
}, { returning: true }) as [ MStreamingPlaylist, boolean ] | |||
video.state = VideoState.PUBLISHED | |||
await video.save() | |||
// FIXME: federation? | |||
return this.runMuxing({ | |||
sessionId, | |||
videoLive, | |||
@@ -207,11 +225,46 @@ class LiveManager { | |||
this.transSessions.set(sessionId, ffmpegExec) | |||
const videoUUID = videoLive.Video.uuid | |||
const tsWatcher = chokidar.watch(outPath + '/*.ts') | |||
const updateHandler = segmentPath => { | |||
this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID }) | |||
} | |||
const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID }) | |||
tsWatcher.on('add', p => updateHandler(p)) | |||
tsWatcher.on('change', p => updateHandler(p)) | |||
tsWatcher.on('unlink', p => deleteHandler(p)) | |||
const masterWatcher = chokidar.watch(outPath + '/master.m3u8') | |||
masterWatcher.on('add', async () => { | |||
try { | |||
const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoLive.videoId) | |||
video.state = VideoState.PUBLISHED | |||
await video.save() | |||
videoLive.Video = video | |||
await federateVideoIfNeeded(video, false) | |||
PeerTubeSocket.Instance.sendVideoLiveNewState(video) | |||
} catch (err) { | |||
logger.error('Cannot federate video %d.', videoLive.videoId, { err }) | |||
} finally { | |||
masterWatcher.close() | |||
.catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err })) | |||
} | |||
}) | |||
const onFFmpegEnded = () => { | |||
watcher.close() | |||
.catch(err => logger.error('Cannot close watcher of %s.', outPath, { err })) | |||
logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', streamPath) | |||
this.onEndTransmuxing(videoLive.Video, playlist, streamPath, outPath) | |||
Promise.all([ tsWatcher.close(), masterWatcher.close() ]) | |||
.catch(err => logger.error('Cannot close watchers of %s.', outPath, { err })) | |||
this.onEndTransmuxing(videoLive.Video.id) | |||
.catch(err => logger.error('Error in closed transmuxing.', { err })) | |||
} | |||
@@ -225,44 +278,30 @@ class LiveManager { | |||
}) | |||
ffmpegExec.on('end', () => onFFmpegEnded()) | |||
const videoUUID = videoLive.Video.uuid | |||
const watcher = chokidar.watch(outPath + '/*.ts') | |||
const updateHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID }) | |||
const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID }) | |||
watcher.on('add', p => updateHandler(p)) | |||
watcher.on('change', p => updateHandler(p)) | |||
watcher.on('unlink', p => deleteHandler(p)) | |||
} | |||
private async onEndTransmuxing (video: MVideo, playlist: MStreamingPlaylist, streamPath: string, outPath: string) { | |||
logger.info('RTMP transmuxing for %s ended.', streamPath) | |||
private async onEndTransmuxing (videoId: number) { | |||
try { | |||
const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) | |||
if (!fullVideo) return | |||
const files = await readdir(outPath) | |||
JobQueue.Instance.createJob({ | |||
type: 'video-live-ending', | |||
payload: { | |||
videoId: fullVideo.id | |||
} | |||
}, { delay: VIDEO_LIVE.CLEANUP_DELAY }) | |||
for (const filename of files) { | |||
if ( | |||
filename.endsWith('.ts') || | |||
filename.endsWith('.m3u8') || | |||
filename.endsWith('.mpd') || | |||
filename.endsWith('.m4s') || | |||
filename.endsWith('.tmp') | |||
) { | |||
const p = join(outPath, filename) | |||
// FIXME: use end | |||
fullVideo.state = VideoState.WAITING_FOR_LIVE | |||
await fullVideo.save() | |||
remove(p) | |||
.catch(err => logger.error('Cannot remove %s.', p, { err })) | |||
} | |||
} | |||
PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo) | |||
playlist.destroy() | |||
.catch(err => logger.error('Cannot remove live streaming playlist.', { err })) | |||
video.state = VideoState.LIVE_ENDED | |||
video.save() | |||
.catch(err => logger.error('Cannot save new video state of live streaming.', { err })) | |||
await federateVideoIfNeeded(fullVideo, false) | |||
} catch (err) { | |||
logger.error('Cannot save/federate new video state of live streaming.', { err }) | |||
} | |||
} | |||
private async addSegmentSha (options: SegmentSha256QueueParam) { | |||
@@ -1,14 +1,18 @@ | |||
import * as SocketIO from 'socket.io' | |||
import { authenticateSocket } from '../middlewares' | |||
import { logger } from '../helpers/logger' | |||
import { Socket } from 'dgram' | |||
import { Server } from 'http' | |||
import * as SocketIO from 'socket.io' | |||
import { MVideo } from '@server/types/models' | |||
import { UserNotificationModelForApi } from '@server/types/models/user' | |||
import { LiveVideoEventPayload, LiveVideoEventType } from '@shared/models' | |||
import { logger } from '../helpers/logger' | |||
import { authenticateSocket } from '../middlewares' | |||
class PeerTubeSocket { | |||
private static instance: PeerTubeSocket | |||
private userNotificationSockets: { [ userId: number ]: SocketIO.Socket[] } = {} | |||
private liveVideosNamespace: SocketIO.Namespace | |||
private constructor () {} | |||
@@ -32,19 +36,37 @@ class PeerTubeSocket { | |||
this.userNotificationSockets[userId] = this.userNotificationSockets[userId].filter(s => s !== socket) | |||
}) | |||
}) | |||
this.liveVideosNamespace = io.of('/live-videos') | |||
.on('connection', socket => { | |||
socket.on('subscribe', ({ videoId }) => socket.join(videoId)) | |||
socket.on('unsubscribe', ({ videoId }) => socket.leave(videoId)) | |||
}) | |||
} | |||
sendNotification (userId: number, notification: UserNotificationModelForApi) { | |||
const sockets = this.userNotificationSockets[userId] | |||
if (!sockets) return | |||
logger.debug('Sending user notification to user %d.', userId) | |||
const notificationMessage = notification.toFormattedJSON() | |||
for (const socket of sockets) { | |||
socket.emit('new-notification', notificationMessage) | |||
} | |||
} | |||
sendVideoLiveNewState (video: MVideo) { | |||
const data: LiveVideoEventPayload = { state: video.state } | |||
const type: LiveVideoEventType = 'state-change' | |||
logger.debug('Sending video live new state notification of %s.', video.url) | |||
this.liveVideosNamespace | |||
.in(video.id) | |||
.emit(type, data) | |||
} | |||
static get Instance () { | |||
return this.instance || (this.instance = new this()) | |||
} | |||
@@ -17,6 +17,7 @@ import { sendDeleteVideo } from './activitypub/send' | |||
import { federateVideoIfNeeded } from './activitypub/videos' | |||
import { Notifier } from './notifier' | |||
import { Hooks } from './plugins/hooks' | |||
import { LiveManager } from './live-manager' | |||
async function autoBlacklistVideoIfNeeded (parameters: { | |||
video: MVideoWithBlacklistLight | |||
@@ -73,6 +74,10 @@ async function blacklistVideo (videoInstance: MVideoAccountLight, options: Video | |||
await sendDeleteVideo(videoInstance, undefined) | |||
} | |||
if (videoInstance.isLive) { | |||
LiveManager.Instance.stopSessionOf(videoInstance.id) | |||
} | |||
Notifier.Instance.notifyOnVideoBlacklist(blacklist) | |||
} | |||
@@ -1,14 +1,21 @@ | |||
import { AllowNull, BelongsTo, Column, CreatedAt, DataType, DefaultScope, ForeignKey, Model, Table, UpdatedAt } from 'sequelize-typescript' | |||
import { WEBSERVER } from '@server/initializers/constants' | |||
import { MVideoLive, MVideoLiveVideo } from '@server/types/models' | |||
import { VideoLive } from '@shared/models/videos/video-live.model' | |||
import { LiveVideo, VideoState } from '@shared/models' | |||
import { VideoModel } from './video' | |||
import { VideoBlacklistModel } from './video-blacklist' | |||
@DefaultScope(() => ({ | |||
include: [ | |||
{ | |||
model: VideoModel, | |||
required: true | |||
required: true, | |||
include: [ | |||
{ | |||
model: VideoBlacklistModel, | |||
required: false | |||
} | |||
] | |||
} | |||
] | |||
})) | |||
@@ -49,7 +56,22 @@ export class VideoLiveModel extends Model<VideoLiveModel> { | |||
const query = { | |||
where: { | |||
streamKey | |||
} | |||
}, | |||
include: [ | |||
{ | |||
model: VideoModel.unscoped(), | |||
required: true, | |||
where: { | |||
state: VideoState.WAITING_FOR_LIVE | |||
}, | |||
include: [ | |||
{ | |||
model: VideoBlacklistModel.unscoped(), | |||
required: false | |||
} | |||
] | |||
} | |||
] | |||
} | |||
return VideoLiveModel.findOne<MVideoLiveVideo>(query) | |||
@@ -65,7 +87,7 @@ export class VideoLiveModel extends Model<VideoLiveModel> { | |||
return VideoLiveModel.findOne<MVideoLive>(query) | |||
} | |||
toFormattedJSON (): VideoLive { | |||
toFormattedJSON (): LiveVideo { | |||
return { | |||
rtmpUrl: WEBSERVER.RTMP_URL, | |||
streamKey: this.streamKey | |||
@@ -153,6 +153,17 @@ export class VideoStreamingPlaylistModel extends Model<VideoStreamingPlaylistMod | |||
return VideoStreamingPlaylistModel.findByPk(id, options) | |||
} | |||
static loadHLSPlaylistByVideo (videoId: number) { | |||
const options = { | |||
where: { | |||
type: VideoStreamingPlaylistType.HLS, | |||
videoId | |||
} | |||
} | |||
return VideoStreamingPlaylistModel.findOne(options) | |||
} | |||
static getHlsPlaylistFilename (resolution: number) { | |||
return resolution + '.m3u8' | |||
} | |||
@@ -127,6 +127,7 @@ import { VideoShareModel } from './video-share' | |||
import { VideoStreamingPlaylistModel } from './video-streaming-playlist' | |||
import { VideoTagModel } from './video-tag' | |||
import { VideoViewModel } from './video-view' | |||
import { LiveManager } from '@server/lib/live-manager' | |||
export enum ScopeNames { | |||
AVAILABLE_FOR_LIST_IDS = 'AVAILABLE_FOR_LIST_IDS', | |||
@@ -799,6 +800,13 @@ export class VideoModel extends Model<VideoModel> { | |||
return undefined | |||
} | |||
@BeforeDestroy | |||
static stopLiveIfNeeded (instance: VideoModel) { | |||
if (!instance.isLive) return | |||
return LiveManager.Instance.stopSessionOf(instance.id) | |||
} | |||
@BeforeDestroy | |||
static invalidateCache (instance: VideoModel) { | |||
ModelCache.Instance.invalidateCache('video', instance.id) | |||
@@ -16,6 +16,7 @@ export type JobType = | |||
| 'videos-views' | |||
| 'activitypub-refresher' | |||
| 'video-redundancy' | |||
| 'video-live-ending' | |||
export interface Job { | |||
id: number | |||
@@ -126,3 +127,7 @@ export type VideoTranscodingPayload = | |||
| NewResolutionTranscodingPayload | |||
| OptimizeTranscodingPayload | |||
| MergeAudioTranscodingPayload | |||
export interface VideoLiveEndingPayload { | |||
videoId: number | |||
} |
@@ -1,6 +1,7 @@ | |||
export * from './blacklist' | |||
export * from './caption' | |||
export * from './channel' | |||
export * from './live' | |||
export * from './import' | |||
export * from './playlist' | |||
export * from './rate' | |||
@@ -19,7 +20,7 @@ export * from './video-create.model' | |||
export * from './video-file-metadata' | |||
export * from './video-file.model' | |||
export * from './video-live.model' | |||
export * from './live/live-video.model' | |||
export * from './video-privacy.enum' | |||
export * from './video-query.type' | |||
@@ -0,0 +1,3 @@ | |||
export * from './live-video-event-payload.model' | |||
export * from './live-video-event.type' | |||
export * from './live-video.model' |
@@ -0,0 +1,5 @@ | |||
import { VideoState } from '../video-state.enum' | |||
export interface LiveVideoEventPayload { | |||
state: VideoState | |||
} |
@@ -0,0 +1 @@ | |||
export type LiveVideoEventType = 'state-change' |
@@ -1,4 +1,4 @@ | |||
export interface VideoLive { | |||
export interface LiveVideo { | |||
rtmpUrl: string | |||
streamKey: string | |||
} |