Node.js: Потоки (Stream) и pipe()

Node.js: Потоки (Stream) и pipe()
5 (100%) 1 vote

Потоковая технология проявляется во всех базовых аспектах Node; она предоставляет функциональность для HTTP, а также для других форм сетевой передачи данных. Кроме того, она предоставляет функциональность для файловой системы.

Поток представлен абстрактным интерфейсом, это означает, что вы не будете создавать потоки напрямую. Вместо этого вы будете работать с различными объектами, реализующими интерфейс Stream, — запросами HTTP, потоками для чтения или записи модуля File System, объектами сжатия Zlib или process.stdout. Непосредственно реализовать Stream API вам придется только в одном случае: при создании собственной реализации потока.

Так как многие объекты в Node реализуют потоковый интерфейс, все потоки в Node обладают базовой функциональностью:

  • Изменение кодировки потоковых данных вызовом setEncoding.
  • Проверка возможности чтения и (или) записи данных в поток.
  • Перехват событий потоков (например, получения данных или закрытия подключения) с назначением функций обратного вызова для каждого события.
  • Приостановка и возобновление потока.
  • Перенаправление данных из потока для чтения в поток для записи.

Обратите внимание на пункт с проверкой чтения и (или) записи. Поток с поддержкой чтения и записи называется дуплексным. Также существует подвид дуплексных потоков, называемый потоком преобразования данных, в котором ввод и вывод связаны причинной зависимостью. Такая разновидность потоков будет описана позднее, когда я буду рассматривать сжатие Zlib.

Поток для чтения начинает работу в приостановленном состоянии; это означает, что никакие данные не будут отправляться до того момента, пока не будет явно выполнена операция чтения ( stream.read() ) или команда возобновления работы потока ( stream.resume() ). Однако используемые нами реализации потоков, такие как поток для чтения модуля File System, переключаются в рабочий режим сразу же при программировании события данных (механизм получения доступа к данным в потоке для чтения). В рабочем режиме данные передаются приложению сразу же при их появлении.



Потоки для чтения поддерживают несколько событий, но на практике нас обычно интересуют три события: data, end и error. Событие data отправляется при получении нового фрагмента данных, готового к использованию, а событие end — при потреблении всех данных. Событие error, естественно, отправляется при возникновении ошибки. Ниже вы увидите в примерах использование модуля File System.

Поток для записи представляет собой приемник, в который передаются (записываются) данные. Среди прослушиваемых событий можно выделить error и событие finish, происходящее при вызове end() и сбросе всех данных. Также встречается событие drain , генерируемое в тот момент, когда попытка записи данных возвращает false.

Дуплексный поток обладает качествами потоков как для чтения, так и для записи. Поток преобразования данных представляет собой дуплексный поток, в котором — в отличие от обычных дуплексных потоков, где внутренние входные и выходные буферы существуют независимо друг от друга, — эти два буфера связаны напрямую через промежуточный этап преобразования данных. Во внутренней реализации поток преобразования данных должен реализовать функцию _transform(), которая получает входные данные, что-то с ними делает, а затем записывает в вывод.

Чтобы лучше понять суть потока преобразования данных, необходимо поближе познакомиться с функциональностью, поддерживаемой всеми потоками: функцией pipe(). Пример ее использования представлен ниже, где поток для чтения напрямую связывал содержимое файла с объектом ответа HTTP:
Создание и перенаправление потока для чтения

const file = fs.createReadStream(pathname);
file.on("open", function() {
  file.pipe(res);
});

Вызов pipe() извлекает данные из файла (поток) и выводит их в объект http.ServerResponse. В документации Node указано, что этот объект реализует интерфейс потока для записи и, как будет показано позднее, fs.createReadStream() возвращает fs.ReadStream — реализацию потока для чтения. В число методов, поддерживаемых потоком для чтения, входит и pipe() с потоком для записи.

Скоро я добавлю (возможно уже добавил) статью / пример использования модуля Zlib для сжатия файла, а пока ограничимся кратким примером, отлично демонстрирующим применение потока преобразования данных:

const gzip = zlib.createGzip();
const fs   = require('fs');
const inp  = fs.createReadStream('input.txt');
const out  = fs.createWriteStream('input.txt.gz');

inp.pipe(gzip).pipe(out);

На вход поступает поток для чтения, на выходе находится поток для записи. Содержимое одного потока передается в другой, но сначала проходит через процедуру сжатия (это и есть преобразование).


Об авторе

Занимаюсь программированием уже более 7 лет. Часто использую JavaScript (Node.js) и Python.

Комментарии