Federated video streaming platform using ActivityPub and P2P in the web browser with Angular. https://joinpeertube.org/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

130 lines
3.1 KiB

  1. import * as request from 'supertest'
  2. import { HttpStatusCode } from '../../../shared/core-utils/miscs/http-error-codes'
  3. import { getDebug, makeGetRequest } from '../../../shared/extra-utils'
  4. import { Job, JobState, JobType, ServerDebug } from '../../models'
  5. import { wait } from '../miscs/miscs'
  6. import { ServerInfo } from './servers'
  7. function buildJobsUrl (state?: JobState) {
  8. let path = '/api/v1/jobs'
  9. if (state) path += '/' + state
  10. return path
  11. }
  12. function getJobsList (url: string, accessToken: string, state?: JobState) {
  13. const path = buildJobsUrl(state)
  14. return request(url)
  15. .get(path)
  16. .set('Accept', 'application/json')
  17. .set('Authorization', 'Bearer ' + accessToken)
  18. .expect(HttpStatusCode.OK_200)
  19. .expect('Content-Type', /json/)
  20. }
  21. function getJobsListPaginationAndSort (options: {
  22. url: string
  23. accessToken: string
  24. start: number
  25. count: number
  26. sort: string
  27. state?: JobState
  28. jobType?: JobType
  29. }) {
  30. const { url, accessToken, state, start, count, sort, jobType } = options
  31. const path = buildJobsUrl(state)
  32. const query = {
  33. start,
  34. count,
  35. sort,
  36. jobType
  37. }
  38. return makeGetRequest({
  39. url,
  40. path,
  41. token: accessToken,
  42. statusCodeExpected: HttpStatusCode.OK_200,
  43. query
  44. })
  45. }
  46. async function waitJobs (serversArg: ServerInfo[] | ServerInfo) {
  47. const pendingJobWait = process.env.NODE_PENDING_JOB_WAIT
  48. ? parseInt(process.env.NODE_PENDING_JOB_WAIT, 10)
  49. : 250
  50. let servers: ServerInfo[]
  51. if (Array.isArray(serversArg) === false) servers = [ serversArg as ServerInfo ]
  52. else servers = serversArg as ServerInfo[]
  53. const states: JobState[] = [ 'waiting', 'active', 'delayed' ]
  54. const repeatableJobs = [ 'videos-views', 'activitypub-cleaner' ]
  55. let pendingRequests: boolean
  56. function tasksBuilder () {
  57. const tasks: Promise<any>[] = []
  58. // Check if each server has pending request
  59. for (const server of servers) {
  60. for (const state of states) {
  61. const p = getJobsListPaginationAndSort({
  62. url: server.url,
  63. accessToken: server.accessToken,
  64. state: state,
  65. start: 0,
  66. count: 10,
  67. sort: '-createdAt'
  68. }).then(res => res.body.data)
  69. .then((jobs: Job[]) => jobs.filter(j => !repeatableJobs.includes(j.type)))
  70. .then(jobs => {
  71. if (jobs.length !== 0) {
  72. pendingRequests = true
  73. }
  74. })
  75. tasks.push(p)
  76. }
  77. const p = getDebug(server.url, server.accessToken)
  78. .then(res => res.body)
  79. .then((obj: ServerDebug) => {
  80. if (obj.activityPubMessagesWaiting !== 0) {
  81. pendingRequests = true
  82. }
  83. })
  84. tasks.push(p)
  85. }
  86. return tasks
  87. }
  88. do {
  89. pendingRequests = false
  90. await Promise.all(tasksBuilder())
  91. // Retry, in case of new jobs were created
  92. if (pendingRequests === false) {
  93. await wait(pendingJobWait)
  94. await Promise.all(tasksBuilder())
  95. }
  96. if (pendingRequests) {
  97. await wait(pendingJobWait)
  98. }
  99. } while (pendingRequests)
  100. }
  101. // ---------------------------------------------------------------------------
  102. export {
  103. getJobsList,
  104. waitJobs,
  105. getJobsListPaginationAndSort
  106. }