Federated video streaming platform using ActivityPub and P2P in the web browser with Angular. https://joinpeertube.org/
25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.
 
 
 
 
 
 

291 satır
9.2 KiB

  1. import Bull, { Job, JobOptions, Queue } from 'bull'
  2. import { jobStates } from '@server/helpers/custom-validators/jobs'
  3. import { CONFIG } from '@server/initializers/config'
  4. import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
  5. import {
  6. ActivitypubFollowPayload,
  7. ActivitypubHttpBroadcastPayload,
  8. ActivitypubHttpFetcherPayload,
  9. ActivitypubHttpUnicastPayload,
  10. ActorKeysPayload,
  11. DeleteResumableUploadMetaFilePayload,
  12. EmailPayload,
  13. JobState,
  14. JobType,
  15. MoveObjectStoragePayload,
  16. RefreshPayload,
  17. VideoFileImportPayload,
  18. VideoImportPayload,
  19. VideoLiveEndingPayload,
  20. VideoRedundancyPayload,
  21. VideoTranscodingPayload
  22. } from '../../../shared/models'
  23. import { logger } from '../../helpers/logger'
  24. import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
  25. import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
  26. import { processActivityPubFollow } from './handlers/activitypub-follow'
  27. import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
  28. import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
  29. import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
  30. import { refreshAPObject } from './handlers/activitypub-refresher'
  31. import { processActorKeys } from './handlers/actor-keys'
  32. import { processEmail } from './handlers/email'
  33. import { processMoveToObjectStorage } from './handlers/move-to-object-storage'
  34. import { processVideoFileImport } from './handlers/video-file-import'
  35. import { processVideoImport } from './handlers/video-import'
  36. import { processVideoLiveEnding } from './handlers/video-live-ending'
  37. import { processVideoTranscoding } from './handlers/video-transcoding'
  38. import { processVideosViewsStats } from './handlers/video-views-stats'
  39. type CreateJobArgument =
  40. { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
  41. { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
  42. { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
  43. { type: 'activitypub-http-cleaner', payload: {} } |
  44. { type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
  45. { type: 'video-file-import', payload: VideoFileImportPayload } |
  46. { type: 'video-transcoding', payload: VideoTranscodingPayload } |
  47. { type: 'email', payload: EmailPayload } |
  48. { type: 'video-import', payload: VideoImportPayload } |
  49. { type: 'activitypub-refresher', payload: RefreshPayload } |
  50. { type: 'videos-views-stats', payload: {} } |
  51. { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
  52. { type: 'actor-keys', payload: ActorKeysPayload } |
  53. { type: 'video-redundancy', payload: VideoRedundancyPayload } |
  54. { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
  55. { type: 'move-to-object-storage', payload: MoveObjectStoragePayload }
  56. export type CreateJobOptions = {
  57. delay?: number
  58. priority?: number
  59. }
  60. const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
  61. 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
  62. 'activitypub-http-unicast': processActivityPubHttpUnicast,
  63. 'activitypub-http-fetcher': processActivityPubHttpFetcher,
  64. 'activitypub-cleaner': processActivityPubCleaner,
  65. 'activitypub-follow': processActivityPubFollow,
  66. 'video-file-import': processVideoFileImport,
  67. 'video-transcoding': processVideoTranscoding,
  68. 'email': processEmail,
  69. 'video-import': processVideoImport,
  70. 'videos-views-stats': processVideosViewsStats,
  71. 'activitypub-refresher': refreshAPObject,
  72. 'video-live-ending': processVideoLiveEnding,
  73. 'actor-keys': processActorKeys,
  74. 'video-redundancy': processVideoRedundancy,
  75. 'move-to-object-storage': processMoveToObjectStorage
  76. }
  77. const jobTypes: JobType[] = [
  78. 'activitypub-follow',
  79. 'activitypub-http-broadcast',
  80. 'activitypub-http-fetcher',
  81. 'activitypub-http-unicast',
  82. 'activitypub-cleaner',
  83. 'email',
  84. 'video-transcoding',
  85. 'video-file-import',
  86. 'video-import',
  87. 'videos-views-stats',
  88. 'activitypub-refresher',
  89. 'video-redundancy',
  90. 'actor-keys',
  91. 'video-live-ending',
  92. 'move-to-object-storage'
  93. ]
  94. class JobQueue {
  95. private static instance: JobQueue
  96. private queues: { [id in JobType]?: Queue } = {}
  97. private initialized = false
  98. private jobRedisPrefix: string
  99. private constructor () {
  100. }
  101. init (produceOnly = false) {
  102. // Already initialized
  103. if (this.initialized === true) return
  104. this.initialized = true
  105. this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
  106. const queueOptions: Bull.QueueOptions = {
  107. prefix: this.jobRedisPrefix,
  108. redis: {
  109. password: CONFIG.REDIS.AUTH,
  110. db: CONFIG.REDIS.DB,
  111. host: CONFIG.REDIS.HOSTNAME,
  112. port: CONFIG.REDIS.PORT,
  113. path: CONFIG.REDIS.SOCKET
  114. },
  115. settings: {
  116. maxStalledCount: 10 // transcoding could be long, so jobs can often be interrupted by restarts
  117. }
  118. }
  119. for (const handlerName of (Object.keys(handlers) as JobType[])) {
  120. const queue = new Bull(handlerName, queueOptions)
  121. if (produceOnly) {
  122. queue.pause(true)
  123. .catch(err => logger.error('Cannot pause queue %s in produced only job queue', handlerName, { err }))
  124. }
  125. const handler = handlers[handlerName]
  126. queue.process(this.getJobConcurrency(handlerName), handler)
  127. .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err }))
  128. queue.on('failed', (job, err) => {
  129. logger.error('Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err })
  130. })
  131. queue.on('error', err => {
  132. logger.error('Error in job queue %s.', handlerName, { err })
  133. })
  134. this.queues[handlerName] = queue
  135. }
  136. this.addRepeatableJobs()
  137. }
  138. terminate () {
  139. for (const queueName of Object.keys(this.queues)) {
  140. const queue = this.queues[queueName]
  141. queue.close()
  142. }
  143. }
  144. createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void {
  145. this.createJobWithPromise(obj, options)
  146. .catch(err => logger.error('Cannot create job.', { err, obj }))
  147. }
  148. createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) {
  149. const queue = this.queues[obj.type]
  150. if (queue === undefined) {
  151. logger.error('Unknown queue %s: cannot create job.', obj.type)
  152. return
  153. }
  154. const jobArgs: JobOptions = {
  155. backoff: { delay: 60 * 1000, type: 'exponential' },
  156. attempts: JOB_ATTEMPTS[obj.type],
  157. timeout: JOB_TTL[obj.type],
  158. priority: options.priority,
  159. delay: options.delay
  160. }
  161. return queue.add(obj.payload, jobArgs)
  162. }
  163. async listForApi (options: {
  164. state?: JobState
  165. start: number
  166. count: number
  167. asc?: boolean
  168. jobType: JobType
  169. }): Promise<Job[]> {
  170. const { state, start, count, asc, jobType } = options
  171. const states = state ? [ state ] : jobStates
  172. let results: Job[] = []
  173. const filteredJobTypes = this.filterJobTypes(jobType)
  174. for (const jobType of filteredJobTypes) {
  175. const queue = this.queues[jobType]
  176. if (queue === undefined) {
  177. logger.error('Unknown queue %s to list jobs.', jobType)
  178. continue
  179. }
  180. const jobs = await queue.getJobs(states, 0, start + count, asc)
  181. results = results.concat(jobs)
  182. }
  183. results.sort((j1: any, j2: any) => {
  184. if (j1.timestamp < j2.timestamp) return -1
  185. else if (j1.timestamp === j2.timestamp) return 0
  186. return 1
  187. })
  188. if (asc === false) results.reverse()
  189. return results.slice(start, start + count)
  190. }
  191. async count (state: JobState, jobType?: JobType): Promise<number> {
  192. const states = state ? [ state ] : jobStates
  193. let total = 0
  194. const filteredJobTypes = this.filterJobTypes(jobType)
  195. for (const type of filteredJobTypes) {
  196. const queue = this.queues[type]
  197. if (queue === undefined) {
  198. logger.error('Unknown queue %s to count jobs.', type)
  199. continue
  200. }
  201. const counts = await queue.getJobCounts()
  202. for (const s of states) {
  203. total += counts[s]
  204. }
  205. }
  206. return total
  207. }
  208. async removeOldJobs () {
  209. for (const key of Object.keys(this.queues)) {
  210. const queue = this.queues[key]
  211. await queue.clean(JOB_COMPLETED_LIFETIME, 'completed')
  212. }
  213. }
  214. private addRepeatableJobs () {
  215. this.queues['videos-views-stats'].add({}, {
  216. repeat: REPEAT_JOBS['videos-views-stats']
  217. }).catch(err => logger.error('Cannot add repeatable job.', { err }))
  218. if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
  219. this.queues['activitypub-cleaner'].add({}, {
  220. repeat: REPEAT_JOBS['activitypub-cleaner']
  221. }).catch(err => logger.error('Cannot add repeatable job.', { err }))
  222. }
  223. }
  224. private filterJobTypes (jobType?: JobType) {
  225. if (!jobType) return jobTypes
  226. return jobTypes.filter(t => t === jobType)
  227. }
  228. private getJobConcurrency (jobType: JobType) {
  229. if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY
  230. if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY
  231. return JOB_CONCURRENCY[jobType]
  232. }
  233. static get Instance () {
  234. return this.instance || (this.instance = new this())
  235. }
  236. }
  237. // ---------------------------------------------------------------------------
  238. export {
  239. jobTypes,
  240. JobQueue
  241. }