Skip to content

@microsoft/fetch-event-source

ts
import { FetchEventSourceInit, fetchEventSource } from '@microsoft/fetch-event-source';

export enum MESSAGE_TYPE {
  NOTIFICATION = 'userlog-notification',
  POSTPROCESSING_FINISHED = 'postprocessing-finished',
  FILE_LOCKED = 'file-locked',
  FILE_UNLOCKED = 'file-unlocked',
  FILE_TOUCHED = 'file-touched',
  ITEM_RENAMED = 'item-renamed',
  ITEM_TRASHED = 'item-trashed',
  ITEM_RESTORED = 'item-restored',
  ITEM_MOVED = 'item-moved',
  FOLDER_CREATED = 'folder-created',
  SPACE_MEMBER_ADDED = 'space-member-added',
  SPACE_MEMBER_REMOVED = 'space-member-removed',
  SPACE_SHARE_UPDATED = 'space-share-updated',
  SHARE_CREATED = 'share-created',
  SHARE_REMOVED = 'share-removed',
  SHARE_UPDATED = 'share-updated',
  LINK_CREATED = 'link-created',
  LINK_REMOVED = 'link-removed',
  LINK_UPDATED = 'link-updated',
  BACKCHANNEL_LOGOUT = 'backchannel-logout'
}

export class RetriableError extends Error {
  name = 'RetriableError';
}

const RECONNECT_RANDOM_OFFSET = 15000;

export class SSEAdapter implements EventSource {
  url: string;
  fetchOptions: FetchEventSourceInit;
  private abortController: AbortController;
  private eventListenerMap: Record<string, ((event: MessageEvent) => any)[]>;

  readyState: number;
  readonly withCredentials: boolean;

  readonly CONNECTING: 0;
  readonly OPEN: 1;
  readonly CLOSED: 2;

  onerror: ((this: EventSource, ev: Event) => any) | null;
  onmessage: ((this: EventSource, ev: MessageEvent) => any) | null;
  onopen: ((this: EventSource, ev: Event) => any) | null;

  constructor(url: string, fetchOptions: FetchEventSourceInit) {
    this.url = url;
    this.fetchOptions = fetchOptions;
    this.abortController = new AbortController();
    this.eventListenerMap = {};
    this.readyState = this.CONNECTING;
    this.connect();
  }

  private connect() {
    return fetchEventSource(this.url, {
      openWhenHidden: true,
      signal: this.abortController.signal,
      fetch: this.fetchProvider.bind(this),

      onopen: async () => {
        const event = new Event('open');
        this.onopen?.bind(this)(event);
        this.readyState = this.OPEN;
      },
      onmessage: (msg) => {
        const event = new MessageEvent('message', { data: msg.data });
        this.onmessage?.bind(this)(event);

        const type = msg.event;
        const eventListeners = this.eventListenerMap[type];
        eventListeners?.forEach(l => l(event));
      },
      onclose: () => {
        this.readyState = this.CLOSED;
        throw new RetriableError();
      },
      onerror: (err) => {
        console.error(err);
        const event = new CustomEvent('error', { detail: err });
        this.onerror?.bind(this)(event);

        /*
         * Try to reconnect after 30 seconds plus random time in seconds.
         * This prevents all clients try to reconnect concurrent on server error, to reduce load.
         */
        return 30000 + Math.floor(Math.random() * RECONNECT_RANDOM_OFFSET);
      }
    });
  }

  fetchProvider(...args: [RequestInfo | URL, RequestInit]) {
    const [resource, config] = args;
    const fetchConfig = { ...config, ...this.fetchOptions };
    return window.fetch(resource, fetchConfig);
  }

  close() {
    this.abortController.abort('closed');
  }

  addEventListener<K extends keyof EventSourceEventMap>(
    type: string,
    listener: (this: EventSource, event: EventSourceEventMap[K]) => any
  ): void {
    this.eventListenerMap[type] = this.eventListenerMap[type] || [];
    this.eventListenerMap[type].push(listener);
  }

  removeEventListener<K extends keyof EventSourceEventMap>(
    type: string,
    listener: (this: EventSource, event: EventSourceEventMap[K]) => any
  ): void {
    this.eventListenerMap[type] = this.eventListenerMap[type]?.filter(func => func !== listener);
  }

  dispatchEvent(): boolean {
    throw new Error('Method not implemented.');
  }

  updateAccessToken(token: string) {
    this.fetchOptions.headers.Authorization = `Bearer ${token}`;
  }

  updateLanguage(language: string) {
    this.fetchOptions.headers['Accept-Language'] = language;

    // Force reconnect, to make the language change effect instantly
    this.close();
    this.connect();
  }
}

let eventSource: SSEAdapter = null;

export function sse(baseURI: string, fetchOptions: FetchEventSourceInit): SSEAdapter {
  if (!eventSource) {
    eventSource = new SSEAdapter(
      new URL('ocs/v2.php/apps/notifications/api/v1/notifications/sse', baseURI).href,
      fetchOptions
    );
  }

  return eventSource;
}
ts
import { useCallback, useContext, useEffect } from 'react';
import { EventStreamContentType, fetchEventSource } from '@microsoft/fetch-event-source';

import { ErrorContext } from './hooks/error';

export const AUTH_TOKEN_KEY = 'fastui-auth-token';
export type Method = 'GET' | 'POST' | 'PATCH' | 'PUT' | 'DELETE';

export function useRequest(): (args: RequestArgs) => Promise<[number, any]> {
  const { setError } = useContext(ErrorContext);

  return useCallback(
    async (args: RequestArgs) => {
      try {
        return await request(args);
      }
      catch (e) {
        const title = 'Request Error';
        if (e instanceof RequestError) {
          setError({ title, description: e.message, statusCode: e.status });
        }
        else {
          setError({ title, description: (e as any)?.message });
        }
        throw e;
      }
    },
    [setError],
  );
}

export interface RequestArgs {
  url: string;
  method?: Method;
  // defaults to 200
  expectedStatus?: number[];
  query?: Record<string, string> | URLSearchParams;
  json?: Record<string, any>;
  formData?: FormData;
  headers?: Record<string, string> | Headers;
}

async function request({
  url,
  method,
  headers,
  query,
  json,
  expectedStatus,
  formData,
}: RequestArgs): Promise<[number, any]> {
  const init: RequestInit = {};

  let contentType = null;
  if (json) {
    init.body = JSON.stringify(json);
    contentType = 'application/json';
    method = method ?? 'POST';
  }
  else if (formData) {
    // don't set content-type, let the browser set it
    init.body = formData;
    method = method ?? 'POST';
  }

  if (query) {
    const searchParams = new URLSearchParams(query);
    url = `${url}?${searchParams.toString()}`;
  }

  init.headers = new Headers(headers);
  if (contentType && !init.headers.get('Content-Type')) {
    init.headers.set('Content-Type', contentType);
  }

  const authHeader = getAuthHeader();
  if (authHeader) {
    init.headers.set(authHeader.key, authHeader.value);
  }

  if (method) {
    init.method = method;
  }

  let response;
  try {
    response = await fetch(url, init);
  }
  catch (e) {
    throw new RequestError('fetch failed', 0);
  }

  const status = await checkResponse(url, response, expectedStatus);
  let data;
  try {
    data = await response.json();
  }
  catch (e) {
    console.warn(`${url} -> ${status} response not valid JSON`);
    throw new RequestError('Response not valid JSON', status);
  }
  console.debug(`${url} -> ${status} JSON:`, data);
  return [status, data];
}

export function useSSE(url: string, onMessage: (data: any) => void, method?: Method, retry?: number): void {
  const { setError } = useContext(ErrorContext);

  useEffect(() => {
    let stop = false;
    const headers: Record<string, string> = {};
    const authHeader = getAuthHeader();
    if (authHeader) {
      headers[authHeader.key] = authHeader.value;
    }
    fetchEventSource(url, {
      method,
      headers,
      async onopen(response) {
        const status = await checkResponse(url, response, [200]);
        const ct = response.headers.get('content-type');
        if (!ct || !ct.startsWith(EventStreamContentType)) {
          console.warn(`${url} -> ${status} content-type "${ct}" != "${EventStreamContentType}"`);
          throw new RequestError('Response not valid event stream', status);
        }
        console.debug(`${url} -> ${status} event stream`);
        // ok
      },
      onmessage(e) {
        if (stop) {
          throw new SSEStopError();
        }
        const data = JSON.parse(e.data);
        onMessage(data);
      },
      onclose() {
        if (typeof retry === 'number') {
          throw new SSERetryError();
        }
        else {
          throw new SSEStopError();
        }
      },
      onerror(e) {
        if (e instanceof SSERetryError) {
          console.debug('SSE retrying');
          return retry;
        }
        else {
          throw e;
        }
      },
    }).catch((e) => {
      if (e instanceof SSEStopError) {
        // do nothing, this is fine
        return;
      }
      const title = 'Request Error';
      if (e instanceof RequestError) {
        setError({ title, description: e.message, statusCode: e.status });
      }
      else {
        setError({ title, description: (e as any)?.message });
      }
      throw e;
    });

    return () => {
      stop = true;
    };
  }, [setError, url, onMessage, method, retry]);
}

class SSERetryError extends Error {
}

class SSEStopError extends Error {
}

class RequestError extends Error {
  status: number;

  constructor(message: string, status: number) {
    super(message);
    this.status = status;
    this.name = 'RequestError';
  }
}

function responseOk(response: Response, expectedStatus?: number[]) {
  if (expectedStatus) {
    return expectedStatus.includes(response.status);
  }
  else {
    return response.ok;
  }
}

export function unreachable(msg: string, unexpectedValue: never, args?: any) {
  console.warn(msg, { unexpectedValue }, args);
}

type Callable = (...args: any[]) => void;

export function debounce<C extends Callable>(fn: C, delay: number): C {
  let timerId: any;

  // @ts-expect-error - functions are contravariant, so this should be fine, no idea how to satisfy TS though
  return (...args: any[]) => {
    clearTimeout(timerId);
    timerId = setTimeout(() => fn(...args), delay);
  };
}

export async function sleep(ms: number): Promise<void> {
  return new Promise(resolve => setTimeout(resolve, ms));
}

// usage `as_title('what_ever') > 'What Ever'`
export const asTitle = (s: string): string => s.replace(/[_-]/g, ' ').replace(/(_|\b)\w/g, l => l.toUpperCase());

export function slugify(s: string): string {
  return s
    .toLowerCase()
    .replace(/\s+/g, '-') // Replace spaces with -
    .replace(/[^\w-]+/g, '') // Remove all non-word characters
    .replace(/-{2,}/g, '-') // Replace multiple - with single -
    .replace(/^-+/, '') // Trim - from start of text
    .replace(/-+$/, '');
} // Trim - from end of text

function getAuthHeader(): { key: string; value: string } | undefined {
  const authToken = localStorage.getItem(AUTH_TOKEN_KEY);
  if (authToken) {
    // we use a custom auth-schema as well-known values like `Basic` and `Bearer` are not correct here
    return { key: 'Authorization', value: `Token ${authToken}` };
  }
}

async function checkResponse(url: string, response: Response, expectedStatus: number[] | undefined): Promise<number> {
  const { status } = response;
  if (!responseOk(response, expectedStatus)) {
    let detail: null | string = null;
    const content = await response.text();
    try {
      const jsonData = JSON.parse(content);
      console.warn(`${url} -> ${status} JSON:`, jsonData);
      if (typeof jsonData.detail === 'string') {
        detail = jsonData.detail;
      }
    }
    catch (e) {
      console.warn(`${url} -> ${status} content:`, content);
      detail = content;
    }
    const msg = `${detail || response.statusText} (${status})`;
    throw new RequestError(msg, status);
  }
  return status;
}

Contributors

作者:Long Mo
字数统计:1.1k 字
阅读时长:7 分钟
Long Mo
文章作者:Long Mo
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Longmo Docs