Streaming: Refactor move database and redis logic into separate files (#31567)
This commit is contained in:
		
							parent
							
								
									a7f8417795
								
							
						
					
					
						commit
						4118688fba
					
				
							
								
								
									
										128
									
								
								streaming/database.js
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										128
									
								
								streaming/database.js
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,128 @@
 | 
				
			|||||||
 | 
					import pg from 'pg';
 | 
				
			||||||
 | 
					import pgConnectionString from 'pg-connection-string';
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import { parseIntFromEnvValue } from './utils.js';
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/**
 | 
				
			||||||
 | 
					 * @param {NodeJS.ProcessEnv} env the `process.env` value to read configuration from
 | 
				
			||||||
 | 
					 * @param {string} environment
 | 
				
			||||||
 | 
					 * @returns {pg.PoolConfig} the configuration for the PostgreSQL connection
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					export function configFromEnv(env, environment) {
 | 
				
			||||||
 | 
					  /** @type {Record<string, pg.PoolConfig>} */
 | 
				
			||||||
 | 
					  const pgConfigs = {
 | 
				
			||||||
 | 
					    development: {
 | 
				
			||||||
 | 
					      user: env.DB_USER || pg.defaults.user,
 | 
				
			||||||
 | 
					      password: env.DB_PASS || pg.defaults.password,
 | 
				
			||||||
 | 
					      database: env.DB_NAME || 'mastodon_development',
 | 
				
			||||||
 | 
					      host: env.DB_HOST || pg.defaults.host,
 | 
				
			||||||
 | 
					      port: parseIntFromEnvValue(env.DB_PORT, pg.defaults.port ?? 5432, 'DB_PORT')
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    production: {
 | 
				
			||||||
 | 
					      user: env.DB_USER || 'mastodon',
 | 
				
			||||||
 | 
					      password: env.DB_PASS || '',
 | 
				
			||||||
 | 
					      database: env.DB_NAME || 'mastodon_production',
 | 
				
			||||||
 | 
					      host: env.DB_HOST || 'localhost',
 | 
				
			||||||
 | 
					      port: parseIntFromEnvValue(env.DB_PORT, 5432, 'DB_PORT')
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					  };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  /**
 | 
				
			||||||
 | 
					   * @type {pg.PoolConfig}
 | 
				
			||||||
 | 
					   */
 | 
				
			||||||
 | 
					  let baseConfig = {};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  if (env.DATABASE_URL) {
 | 
				
			||||||
 | 
					    const parsedUrl = pgConnectionString.parse(env.DATABASE_URL);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    // The result of dbUrlToConfig from pg-connection-string is not type
 | 
				
			||||||
 | 
					    // compatible with pg.PoolConfig, since parts of the connection URL may be
 | 
				
			||||||
 | 
					    // `null` when pg.PoolConfig expects `undefined`, as such we have to
 | 
				
			||||||
 | 
					    // manually create the baseConfig object from the properties of the
 | 
				
			||||||
 | 
					    // parsedUrl.
 | 
				
			||||||
 | 
					    //
 | 
				
			||||||
 | 
					    // For more information see:
 | 
				
			||||||
 | 
					    // https://github.com/brianc/node-postgres/issues/2280
 | 
				
			||||||
 | 
					    //
 | 
				
			||||||
 | 
					    // FIXME: clean up once brianc/node-postgres#3128 lands
 | 
				
			||||||
 | 
					    if (typeof parsedUrl.password === 'string') baseConfig.password = parsedUrl.password;
 | 
				
			||||||
 | 
					    if (typeof parsedUrl.host === 'string') baseConfig.host = parsedUrl.host;
 | 
				
			||||||
 | 
					    if (typeof parsedUrl.user === 'string') baseConfig.user = parsedUrl.user;
 | 
				
			||||||
 | 
					    if (typeof parsedUrl.port === 'string') {
 | 
				
			||||||
 | 
					      const parsedPort = parseInt(parsedUrl.port, 10);
 | 
				
			||||||
 | 
					      if (isNaN(parsedPort)) {
 | 
				
			||||||
 | 
					        throw new Error('Invalid port specified in DATABASE_URL environment variable');
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					      baseConfig.port = parsedPort;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    if (typeof parsedUrl.database === 'string') baseConfig.database = parsedUrl.database;
 | 
				
			||||||
 | 
					    if (typeof parsedUrl.options === 'string') baseConfig.options = parsedUrl.options;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    // The pg-connection-string type definition isn't correct, as parsedUrl.ssl
 | 
				
			||||||
 | 
					    // can absolutely be an Object, this is to work around these incorrect
 | 
				
			||||||
 | 
					    // types, including the casting of parsedUrl.ssl to Record<string, any>
 | 
				
			||||||
 | 
					    if (typeof parsedUrl.ssl === 'boolean') {
 | 
				
			||||||
 | 
					      baseConfig.ssl = parsedUrl.ssl;
 | 
				
			||||||
 | 
					    } else if (typeof parsedUrl.ssl === 'object' && !Array.isArray(parsedUrl.ssl) && parsedUrl.ssl !== null) {
 | 
				
			||||||
 | 
					      /** @type {Record<string, any>} */
 | 
				
			||||||
 | 
					      const sslOptions = parsedUrl.ssl;
 | 
				
			||||||
 | 
					      baseConfig.ssl = {};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      baseConfig.ssl.cert = sslOptions.cert;
 | 
				
			||||||
 | 
					      baseConfig.ssl.key = sslOptions.key;
 | 
				
			||||||
 | 
					      baseConfig.ssl.ca = sslOptions.ca;
 | 
				
			||||||
 | 
					      baseConfig.ssl.rejectUnauthorized = sslOptions.rejectUnauthorized;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    // Support overriding the database password in the connection URL
 | 
				
			||||||
 | 
					    if (!baseConfig.password && env.DB_PASS) {
 | 
				
			||||||
 | 
					      baseConfig.password = env.DB_PASS;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  } else if (Object.hasOwn(pgConfigs, environment)) {
 | 
				
			||||||
 | 
					    baseConfig = pgConfigs[environment];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if (env.DB_SSLMODE) {
 | 
				
			||||||
 | 
					      switch(env.DB_SSLMODE) {
 | 
				
			||||||
 | 
					      case 'disable':
 | 
				
			||||||
 | 
					      case '':
 | 
				
			||||||
 | 
					        baseConfig.ssl = false;
 | 
				
			||||||
 | 
					        break;
 | 
				
			||||||
 | 
					      case 'no-verify':
 | 
				
			||||||
 | 
					        baseConfig.ssl = { rejectUnauthorized: false };
 | 
				
			||||||
 | 
					        break;
 | 
				
			||||||
 | 
					      default:
 | 
				
			||||||
 | 
					        baseConfig.ssl = {};
 | 
				
			||||||
 | 
					        break;
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  } else {
 | 
				
			||||||
 | 
					    throw new Error('Unable to resolve postgresql database configuration.');
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  return {
 | 
				
			||||||
 | 
					    ...baseConfig,
 | 
				
			||||||
 | 
					    max: parseIntFromEnvValue(env.DB_POOL, 10, 'DB_POOL'),
 | 
				
			||||||
 | 
					    connectionTimeoutMillis: 15000,
 | 
				
			||||||
 | 
					    // Deliberately set application_name to an empty string to prevent excessive
 | 
				
			||||||
 | 
					    // CPU usage with PG Bouncer. See:
 | 
				
			||||||
 | 
					    // - https://github.com/mastodon/mastodon/pull/23958
 | 
				
			||||||
 | 
					    // - https://github.com/pgbouncer/pgbouncer/issues/349
 | 
				
			||||||
 | 
					    application_name: '',
 | 
				
			||||||
 | 
					  };
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					let pool;
 | 
				
			||||||
 | 
					/**
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * @param {pg.PoolConfig} config
 | 
				
			||||||
 | 
					 * @returns {pg.Pool}
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					export function getPool(config) {
 | 
				
			||||||
 | 
					  if (pool) {
 | 
				
			||||||
 | 
					    return pool;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  pool = new pg.Pool(config);
 | 
				
			||||||
 | 
					  return pool;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@ -8,15 +8,14 @@ import url from 'node:url';
 | 
				
			|||||||
import cors from 'cors';
 | 
					import cors from 'cors';
 | 
				
			||||||
import dotenv from 'dotenv';
 | 
					import dotenv from 'dotenv';
 | 
				
			||||||
import express from 'express';
 | 
					import express from 'express';
 | 
				
			||||||
import { Redis } from 'ioredis';
 | 
					 | 
				
			||||||
import { JSDOM } from 'jsdom';
 | 
					import { JSDOM } from 'jsdom';
 | 
				
			||||||
import pg from 'pg';
 | 
					 | 
				
			||||||
import pgConnectionString from 'pg-connection-string';
 | 
					 | 
				
			||||||
import { WebSocketServer } from 'ws';
 | 
					import { WebSocketServer } from 'ws';
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import * as Database from './database.js';
 | 
				
			||||||
import { AuthenticationError, RequestError, extractStatusAndMessage as extractErrorStatusAndMessage } from './errors.js';
 | 
					import { AuthenticationError, RequestError, extractStatusAndMessage as extractErrorStatusAndMessage } from './errors.js';
 | 
				
			||||||
import { logger, httpLogger, initializeLogLevel, attachWebsocketHttpLogger, createWebsocketLogger } from './logging.js';
 | 
					import { logger, httpLogger, initializeLogLevel, attachWebsocketHttpLogger, createWebsocketLogger } from './logging.js';
 | 
				
			||||||
import { setupMetrics } from './metrics.js';
 | 
					import { setupMetrics } from './metrics.js';
 | 
				
			||||||
 | 
					import * as Redis from './redis.js';
 | 
				
			||||||
import { isTruthy, normalizeHashtag, firstParam } from './utils.js';
 | 
					import { isTruthy, normalizeHashtag, firstParam } from './utils.js';
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const environment = process.env.NODE_ENV || 'development';
 | 
					const environment = process.env.NODE_ENV || 'development';
 | 
				
			||||||
@ -48,23 +47,6 @@ initializeLogLevel(process.env, environment);
 | 
				
			|||||||
 * @property {string} deviceId
 | 
					 * @property {string} deviceId
 | 
				
			||||||
 */
 | 
					 */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/**
 | 
					 | 
				
			||||||
 * @param {RedisConfiguration} config
 | 
					 | 
				
			||||||
 * @returns {Promise<Redis>}
 | 
					 | 
				
			||||||
 */
 | 
					 | 
				
			||||||
const createRedisClient = async ({ redisParams, redisUrl }) => {
 | 
					 | 
				
			||||||
  let client;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  if (typeof redisUrl === 'string') {
 | 
					 | 
				
			||||||
    client = new Redis(redisUrl, redisParams);
 | 
					 | 
				
			||||||
  } else {
 | 
					 | 
				
			||||||
    client = new Redis(redisParams);
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  client.on('error', (err) => logger.error({ err }, 'Redis Client Error!'));
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  return client;
 | 
					 | 
				
			||||||
};
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
/**
 | 
					/**
 | 
				
			||||||
 * Attempts to safely parse a string as JSON, used when both receiving a message
 | 
					 * Attempts to safely parse a string as JSON, used when both receiving a message
 | 
				
			||||||
@ -97,177 +79,6 @@ const parseJSON = (json, req) => {
 | 
				
			|||||||
  }
 | 
					  }
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/**
 | 
					 | 
				
			||||||
 * Takes an environment variable that should be an integer, attempts to parse
 | 
					 | 
				
			||||||
 * it falling back to a default if not set, and handles errors parsing.
 | 
					 | 
				
			||||||
 * @param {string|undefined} value
 | 
					 | 
				
			||||||
 * @param {number} defaultValue
 | 
					 | 
				
			||||||
 * @param {string} variableName
 | 
					 | 
				
			||||||
 * @returns {number}
 | 
					 | 
				
			||||||
 */
 | 
					 | 
				
			||||||
const parseIntFromEnv = (value, defaultValue, variableName) => {
 | 
					 | 
				
			||||||
  if (typeof value === 'string' && value.length > 0) {
 | 
					 | 
				
			||||||
    const parsedValue = parseInt(value, 10);
 | 
					 | 
				
			||||||
    if (isNaN(parsedValue)) {
 | 
					 | 
				
			||||||
      throw new Error(`Invalid ${variableName} environment variable: ${value}`);
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    return parsedValue;
 | 
					 | 
				
			||||||
  } else {
 | 
					 | 
				
			||||||
    return defaultValue;
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
};
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
/**
 | 
					 | 
				
			||||||
 * @param {NodeJS.ProcessEnv} env the `process.env` value to read configuration from
 | 
					 | 
				
			||||||
 * @returns {pg.PoolConfig} the configuration for the PostgreSQL connection
 | 
					 | 
				
			||||||
 */
 | 
					 | 
				
			||||||
const pgConfigFromEnv = (env) => {
 | 
					 | 
				
			||||||
  /** @type {Record<string, pg.PoolConfig>} */
 | 
					 | 
				
			||||||
  const pgConfigs = {
 | 
					 | 
				
			||||||
    development: {
 | 
					 | 
				
			||||||
      user: env.DB_USER || pg.defaults.user,
 | 
					 | 
				
			||||||
      password: env.DB_PASS || pg.defaults.password,
 | 
					 | 
				
			||||||
      database: env.DB_NAME || 'mastodon_development',
 | 
					 | 
				
			||||||
      host: env.DB_HOST || pg.defaults.host,
 | 
					 | 
				
			||||||
      port: parseIntFromEnv(env.DB_PORT, pg.defaults.port ?? 5432, 'DB_PORT')
 | 
					 | 
				
			||||||
    },
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    production: {
 | 
					 | 
				
			||||||
      user: env.DB_USER || 'mastodon',
 | 
					 | 
				
			||||||
      password: env.DB_PASS || '',
 | 
					 | 
				
			||||||
      database: env.DB_NAME || 'mastodon_production',
 | 
					 | 
				
			||||||
      host: env.DB_HOST || 'localhost',
 | 
					 | 
				
			||||||
      port: parseIntFromEnv(env.DB_PORT, 5432, 'DB_PORT')
 | 
					 | 
				
			||||||
    },
 | 
					 | 
				
			||||||
  };
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  /**
 | 
					 | 
				
			||||||
   * @type {pg.PoolConfig}
 | 
					 | 
				
			||||||
   */
 | 
					 | 
				
			||||||
  let baseConfig = {};
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  if (env.DATABASE_URL) {
 | 
					 | 
				
			||||||
    const parsedUrl = pgConnectionString.parse(env.DATABASE_URL);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    // The result of dbUrlToConfig from pg-connection-string is not type
 | 
					 | 
				
			||||||
    // compatible with pg.PoolConfig, since parts of the connection URL may be
 | 
					 | 
				
			||||||
    // `null` when pg.PoolConfig expects `undefined`, as such we have to
 | 
					 | 
				
			||||||
    // manually create the baseConfig object from the properties of the
 | 
					 | 
				
			||||||
    // parsedUrl.
 | 
					 | 
				
			||||||
    //
 | 
					 | 
				
			||||||
    // For more information see:
 | 
					 | 
				
			||||||
    // https://github.com/brianc/node-postgres/issues/2280
 | 
					 | 
				
			||||||
    //
 | 
					 | 
				
			||||||
    // FIXME: clean up once brianc/node-postgres#3128 lands
 | 
					 | 
				
			||||||
    if (typeof parsedUrl.password === 'string') baseConfig.password = parsedUrl.password;
 | 
					 | 
				
			||||||
    if (typeof parsedUrl.host === 'string') baseConfig.host = parsedUrl.host;
 | 
					 | 
				
			||||||
    if (typeof parsedUrl.user === 'string') baseConfig.user = parsedUrl.user;
 | 
					 | 
				
			||||||
    if (typeof parsedUrl.port === 'string') {
 | 
					 | 
				
			||||||
      const parsedPort = parseInt(parsedUrl.port, 10);
 | 
					 | 
				
			||||||
      if (isNaN(parsedPort)) {
 | 
					 | 
				
			||||||
        throw new Error('Invalid port specified in DATABASE_URL environment variable');
 | 
					 | 
				
			||||||
      }
 | 
					 | 
				
			||||||
      baseConfig.port = parsedPort;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    if (typeof parsedUrl.database === 'string') baseConfig.database = parsedUrl.database;
 | 
					 | 
				
			||||||
    if (typeof parsedUrl.options === 'string') baseConfig.options = parsedUrl.options;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    // The pg-connection-string type definition isn't correct, as parsedUrl.ssl
 | 
					 | 
				
			||||||
    // can absolutely be an Object, this is to work around these incorrect
 | 
					 | 
				
			||||||
    // types, including the casting of parsedUrl.ssl to Record<string, any>
 | 
					 | 
				
			||||||
    if (typeof parsedUrl.ssl === 'boolean') {
 | 
					 | 
				
			||||||
      baseConfig.ssl = parsedUrl.ssl;
 | 
					 | 
				
			||||||
    } else if (typeof parsedUrl.ssl === 'object' && !Array.isArray(parsedUrl.ssl) && parsedUrl.ssl !== null) {
 | 
					 | 
				
			||||||
      /** @type {Record<string, any>} */
 | 
					 | 
				
			||||||
      const sslOptions = parsedUrl.ssl;
 | 
					 | 
				
			||||||
      baseConfig.ssl = {};
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
      baseConfig.ssl.cert = sslOptions.cert;
 | 
					 | 
				
			||||||
      baseConfig.ssl.key = sslOptions.key;
 | 
					 | 
				
			||||||
      baseConfig.ssl.ca = sslOptions.ca;
 | 
					 | 
				
			||||||
      baseConfig.ssl.rejectUnauthorized = sslOptions.rejectUnauthorized;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    // Support overriding the database password in the connection URL
 | 
					 | 
				
			||||||
    if (!baseConfig.password && env.DB_PASS) {
 | 
					 | 
				
			||||||
      baseConfig.password = env.DB_PASS;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
  } else if (Object.hasOwn(pgConfigs, environment)) {
 | 
					 | 
				
			||||||
    baseConfig = pgConfigs[environment];
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    if (env.DB_SSLMODE) {
 | 
					 | 
				
			||||||
      switch(env.DB_SSLMODE) {
 | 
					 | 
				
			||||||
      case 'disable':
 | 
					 | 
				
			||||||
      case '':
 | 
					 | 
				
			||||||
        baseConfig.ssl = false;
 | 
					 | 
				
			||||||
        break;
 | 
					 | 
				
			||||||
      case 'no-verify':
 | 
					 | 
				
			||||||
        baseConfig.ssl = { rejectUnauthorized: false };
 | 
					 | 
				
			||||||
        break;
 | 
					 | 
				
			||||||
      default:
 | 
					 | 
				
			||||||
        baseConfig.ssl = {};
 | 
					 | 
				
			||||||
        break;
 | 
					 | 
				
			||||||
      }
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
  } else {
 | 
					 | 
				
			||||||
    throw new Error('Unable to resolve postgresql database configuration.');
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  return {
 | 
					 | 
				
			||||||
    ...baseConfig,
 | 
					 | 
				
			||||||
    max: parseIntFromEnv(env.DB_POOL, 10, 'DB_POOL'),
 | 
					 | 
				
			||||||
    connectionTimeoutMillis: 15000,
 | 
					 | 
				
			||||||
    // Deliberately set application_name to an empty string to prevent excessive
 | 
					 | 
				
			||||||
    // CPU usage with PG Bouncer. See:
 | 
					 | 
				
			||||||
    // - https://github.com/mastodon/mastodon/pull/23958
 | 
					 | 
				
			||||||
    // - https://github.com/pgbouncer/pgbouncer/issues/349
 | 
					 | 
				
			||||||
    application_name: '',
 | 
					 | 
				
			||||||
  };
 | 
					 | 
				
			||||||
};
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
/**
 | 
					 | 
				
			||||||
 * @typedef RedisConfiguration
 | 
					 | 
				
			||||||
 * @property {import('ioredis').RedisOptions} redisParams
 | 
					 | 
				
			||||||
 * @property {string} redisPrefix
 | 
					 | 
				
			||||||
 * @property {string|undefined} redisUrl
 | 
					 | 
				
			||||||
 */
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
/**
 | 
					 | 
				
			||||||
 * @param {NodeJS.ProcessEnv} env the `process.env` value to read configuration from
 | 
					 | 
				
			||||||
 * @returns {RedisConfiguration} configuration for the Redis connection
 | 
					 | 
				
			||||||
 */
 | 
					 | 
				
			||||||
const redisConfigFromEnv = (env) => {
 | 
					 | 
				
			||||||
  // ioredis *can* transparently add prefixes for us, but it doesn't *in some cases*,
 | 
					 | 
				
			||||||
  // which means we can't use it. But this is something that should be looked into.
 | 
					 | 
				
			||||||
  const redisPrefix = env.REDIS_NAMESPACE ? `${env.REDIS_NAMESPACE}:` : '';
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  let redisPort = parseIntFromEnv(env.REDIS_PORT, 6379, 'REDIS_PORT');
 | 
					 | 
				
			||||||
  let redisDatabase = parseIntFromEnv(env.REDIS_DB, 0, 'REDIS_DB');
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  /** @type {import('ioredis').RedisOptions} */
 | 
					 | 
				
			||||||
  const redisParams = {
 | 
					 | 
				
			||||||
    host: env.REDIS_HOST || '127.0.0.1',
 | 
					 | 
				
			||||||
    port: redisPort,
 | 
					 | 
				
			||||||
    // Force support for both IPv6 and IPv4, by default ioredis sets this to 4,
 | 
					 | 
				
			||||||
    // only allowing IPv4 connections:
 | 
					 | 
				
			||||||
    // https://github.com/redis/ioredis/issues/1576
 | 
					 | 
				
			||||||
    family: 0,
 | 
					 | 
				
			||||||
    db: redisDatabase,
 | 
					 | 
				
			||||||
    password: env.REDIS_PASSWORD || undefined,
 | 
					 | 
				
			||||||
  };
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  // redisParams.path takes precedence over host and port.
 | 
					 | 
				
			||||||
  if (env.REDIS_URL && env.REDIS_URL.startsWith('unix://')) {
 | 
					 | 
				
			||||||
    redisParams.path = env.REDIS_URL.slice(7);
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  return {
 | 
					 | 
				
			||||||
    redisParams,
 | 
					 | 
				
			||||||
    redisPrefix,
 | 
					 | 
				
			||||||
    redisUrl: typeof env.REDIS_URL === 'string' ? env.REDIS_URL : undefined,
 | 
					 | 
				
			||||||
  };
 | 
					 | 
				
			||||||
};
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
const PUBLIC_CHANNELS = [
 | 
					const PUBLIC_CHANNELS = [
 | 
				
			||||||
  'public',
 | 
					  'public',
 | 
				
			||||||
  'public:media',
 | 
					  'public:media',
 | 
				
			||||||
@ -291,10 +102,12 @@ const CHANNEL_NAMES = [
 | 
				
			|||||||
];
 | 
					];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const startServer = async () => {
 | 
					const startServer = async () => {
 | 
				
			||||||
  const pgPool = new pg.Pool(pgConfigFromEnv(process.env));
 | 
					  const pgPool = Database.getPool(Database.configFromEnv(process.env, environment));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  const metrics = setupMetrics(CHANNEL_NAMES, pgPool);
 | 
					  const metrics = setupMetrics(CHANNEL_NAMES, pgPool);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  const redisConfig = Redis.configFromEnv(process.env);
 | 
				
			||||||
 | 
					  const redisClient = Redis.createClient(redisConfig, logger);
 | 
				
			||||||
  const server = http.createServer();
 | 
					  const server = http.createServer();
 | 
				
			||||||
  const wss = new WebSocketServer({ noServer: true });
 | 
					  const wss = new WebSocketServer({ noServer: true });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -386,9 +199,7 @@ const startServer = async () => {
 | 
				
			|||||||
   */
 | 
					   */
 | 
				
			||||||
  const subs = {};
 | 
					  const subs = {};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  const redisConfig = redisConfigFromEnv(process.env);
 | 
					  const redisSubscribeClient = Redis.createClient(redisConfig, logger);
 | 
				
			||||||
  const redisSubscribeClient = await createRedisClient(redisConfig);
 | 
					 | 
				
			||||||
  const redisClient = await createRedisClient(redisConfig);
 | 
					 | 
				
			||||||
  const { redisPrefix } = redisConfig;
 | 
					  const { redisPrefix } = redisConfig;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  // When checking metrics in the browser, the favicon is requested this
 | 
					  // When checking metrics in the browser, the favicon is requested this
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										65
									
								
								streaming/redis.js
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										65
									
								
								streaming/redis.js
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,65 @@
 | 
				
			|||||||
 | 
					import { Redis } from 'ioredis';
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import { parseIntFromEnvValue } from './utils.js';
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/**
 | 
				
			||||||
 | 
					 * @typedef RedisConfiguration
 | 
				
			||||||
 | 
					 * @property {import('ioredis').RedisOptions} redisParams
 | 
				
			||||||
 | 
					 * @property {string} redisPrefix
 | 
				
			||||||
 | 
					 * @property {string|undefined} redisUrl
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/**
 | 
				
			||||||
 | 
					 * @param {NodeJS.ProcessEnv} env the `process.env` value to read configuration from
 | 
				
			||||||
 | 
					 * @returns {RedisConfiguration} configuration for the Redis connection
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					export function configFromEnv(env) {
 | 
				
			||||||
 | 
					  // ioredis *can* transparently add prefixes for us, but it doesn't *in some cases*,
 | 
				
			||||||
 | 
					  // which means we can't use it. But this is something that should be looked into.
 | 
				
			||||||
 | 
					  const redisPrefix = env.REDIS_NAMESPACE ? `${env.REDIS_NAMESPACE}:` : '';
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  let redisPort = parseIntFromEnvValue(env.REDIS_PORT, 6379, 'REDIS_PORT');
 | 
				
			||||||
 | 
					  let redisDatabase = parseIntFromEnvValue(env.REDIS_DB, 0, 'REDIS_DB');
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  /** @type {import('ioredis').RedisOptions} */
 | 
				
			||||||
 | 
					  const redisParams = {
 | 
				
			||||||
 | 
					    host: env.REDIS_HOST || '127.0.0.1',
 | 
				
			||||||
 | 
					    port: redisPort,
 | 
				
			||||||
 | 
					    // Force support for both IPv6 and IPv4, by default ioredis sets this to 4,
 | 
				
			||||||
 | 
					    // only allowing IPv4 connections:
 | 
				
			||||||
 | 
					    // https://github.com/redis/ioredis/issues/1576
 | 
				
			||||||
 | 
					    family: 0,
 | 
				
			||||||
 | 
					    db: redisDatabase,
 | 
				
			||||||
 | 
					    password: env.REDIS_PASSWORD || undefined,
 | 
				
			||||||
 | 
					  };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  // redisParams.path takes precedence over host and port.
 | 
				
			||||||
 | 
					  if (env.REDIS_URL && env.REDIS_URL.startsWith('unix://')) {
 | 
				
			||||||
 | 
					    redisParams.path = env.REDIS_URL.slice(7);
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  return {
 | 
				
			||||||
 | 
					    redisParams,
 | 
				
			||||||
 | 
					    redisPrefix,
 | 
				
			||||||
 | 
					    redisUrl: typeof env.REDIS_URL === 'string' ? env.REDIS_URL : undefined,
 | 
				
			||||||
 | 
					  };
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/**
 | 
				
			||||||
 | 
					 * @param {RedisConfiguration} config
 | 
				
			||||||
 | 
					 * @param {import('pino').Logger} logger
 | 
				
			||||||
 | 
					 * @returns {Redis}
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					export function createClient({ redisParams, redisUrl }, logger) {
 | 
				
			||||||
 | 
					  let client;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  if (typeof redisUrl === 'string') {
 | 
				
			||||||
 | 
					    client = new Redis(redisUrl, redisParams);
 | 
				
			||||||
 | 
					  } else {
 | 
				
			||||||
 | 
					    client = new Redis(redisParams);
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  client.on('error', (err) => logger.error({ err }, 'Redis Client Error!'));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  return client;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@ -59,3 +59,23 @@ export function firstParam(arrayOrString) {
 | 
				
			|||||||
    return arrayOrString;
 | 
					    return arrayOrString;
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/**
 | 
				
			||||||
 | 
					 * Takes an environment variable that should be an integer, attempts to parse
 | 
				
			||||||
 | 
					 * it falling back to a default if not set, and handles errors parsing.
 | 
				
			||||||
 | 
					 * @param {string|undefined} value
 | 
				
			||||||
 | 
					 * @param {number} defaultValue
 | 
				
			||||||
 | 
					 * @param {string} variableName
 | 
				
			||||||
 | 
					 * @returns {number}
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					export function parseIntFromEnvValue(value, defaultValue, variableName) {
 | 
				
			||||||
 | 
					  if (typeof value === 'string' && value.length > 0) {
 | 
				
			||||||
 | 
					    const parsedValue = parseInt(value, 10);
 | 
				
			||||||
 | 
					    if (isNaN(parsedValue)) {
 | 
				
			||||||
 | 
					      throw new Error(`Invalid ${variableName} environment variable: ${value}`);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    return parsedValue;
 | 
				
			||||||
 | 
					  } else {
 | 
				
			||||||
 | 
					    return defaultValue;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user