Dart Async programming - Stream

2022. 6. 24. 14:35DEV/Dart

반응형

Stream

  • Stream은 비동기식 데이터 시퀀스를 제공한다. Streams provide an asynchronous sequence of data.
  • 데이터 시퀀스는 사용자 생성 이벤트나 파일에서 읽은 데이터를 포함한다. Data sequences include user-generated events and data read from files.
  • Stream API에서 await for 또는 listen() 을 사용해서 Stream을 처리할 수 있다. You can process a stream using either await for or listen() from the Stream API.
  • Stream은 오류에 반응하는 방법을 제공한다. Streams provide a way to respond to errors.
  • 두 가지의 Stream이 있다: Single Subscription, Broadcast There are two kinds of streams: single subscription or broadcast.

Single Subscription

import 'dart:async';

void main() async {
  // Stream 생성
  final controller = StreamController(); // dart:async package 필수
  final stream = controller.stream; // Single Stream

  // Listner 생성
  final streamListner1 = stream.listen((val) {
      // 리스닝을 하고 있을 때 받은 값을 사용
    print('Listner1 -> $val');
  });

  // Stream은 여러 개의 값을 돌려줄 수 있다
  for (int i = 1; i < 6; i++) {
    controller.sink.add(i);
  }
}
=>
Listner1 -> 1
Listner1 -> 2
Listner1 -> 3
Listner1 -> 4
Listner1 -> 5

Broadcast

여러 개의 Listner를 추가해서 여러 번 리스닝할 수 있다. 이때 controller.stream.asBroadcastStream()을 추가해줘야 여러 개의 Listner를 생성할 수 있다.

import 'dart:async';

void main() async {
  // stream 생성
  final controller = StreamController(); // dart:async 필수
    final stream = controller.stream
      .asBroadcastStream(); // asBroadcastStream을 해줘야 여러 개의 listner 생성 가능

  // Listner 생성
  final streamListner1 = stream.listen((val) {
    print('Listner1 -> $val');
  });

  final streamListner2 = stream.listen((val) {
    print('Listner2 -> $val');
  });

  for (int i = 1; i < 6; i++) {
    controller.sink.add(i);
  }
}
=>
Listner1 -> 1
Listner2 -> 1
Listner1 -> 2
Listner2 -> 2
Listner1 -> 3
Listner2 -> 3
Listner1 -> 4
Listner2 -> 4
Listner1 -> 5
Listner2 -> 5

Stream 값 변경

바로 Stream Listner의 출력 값을 선택적으로 조정할 수도 있다.

import 'dart:async';

void main() async {
  // stream 생성
  final controller = StreamController(); // dart:async 필수
  final stream = controller.stream
      .asBroadcastStream(); // asBroadcastStream을 해줘야 여러 개의 listner 생성 가능

  // Listner 생성

  final streamListner1 = stream.where((val) => val % 2 == 0).listen((val) {
    print('Listner1 -> $val');
  });

  final streamListner2 = stream.where((val) => val % 2 == 1).listen((val) {
    print('Listner2 -> $val');
  });

  for (int i = 1; i < 6; i++) {
    controller.sink.add(i);
  }
}
=>
Listner2 -> 1
Listner1 -> 2
Listner2 -> 3
Listner1 -> 4
Listner2 -> 5

설정한 조건에 맞춰 각 리스너가 작동하는 것을 확인할 수 있다.

Stream 함수

calculate(int num) {
  for (int i = 0; i < 5; i++) {
    return i * num;
  }
}
=> 0

위와 같은 일반적인 함수의 경우 return을 하는 순간 함수가 끝나므로 결과적으로 0 값만 받을 수가 있다. 하지만 Stream을 사용하면 모든 값을 받을 수 있게 된다.

일단 이해를 쉽게 하기 위해 위 함수에 Future를 적용해보자.

Future<int> calculate(int num) async {
  for (int i = 0; i < 5; i++) {
    return i * num;
  }
}

이를 Stream을 적용하려면 async 대신 async*을 사용하고, return 대신 yield를 사용한다.

import 'dart:async';

void main() async {
  calculate(1).listen((val){
    print(val);
  });
}

Stream<int> calculate(int num) async* {
  for (int i = 0; i < 5; i++) {
    yield i * num;

    await Future.delayed(Duration(seconds: 1)); // 시간차를 두기 위한 설정
  }
}
=>
0
1
2
3
4

이렇게 하면 1초 간격으로 모든 값을 받아서 사용할 수 있게 된다.

이번에는 Stream의 흐름에 대해서 살펴보자.

import 'dart:async';

void main() async {
  calculate(1).listen((val) {
    print('1 -> $val');
  });

  calculate(2).listen((val) {
    print('2 -> $val');
  });
}

Stream<int> calculate(int num) async* {
  for (int i = 0; i < 5; i++) {
    yield i * num;

    await Future.delayed(Duration(seconds: 1)); // 시간차를 두기 위해 추가
  }
}

=>
1 -> 0
2 -> 0
1 -> 1
2 -> 2
1 -> 2
2 -> 4
1 -> 3
2 -> 6
1 -> 4
2 -> 8

calculate(1)과 calculate(2)가 동시에 실행되는 것을 알 수 있다.

calculate() 자체적으로 async를 적용해 calculate(1)이 완전히 끝난 후 calculate(2)를 실행하도록 해보자.

import 'dart:async';

void main() async {
  runAll().listen((val){
    print(val);
  });
}

// Stream에서 await 기능 사용하기
Stream<int> runAll() async* {
  yield* calculate(1);
  yield* calculate(2);
}

Stream<int> calculate(int num) async* {
  for (int i = 0; i < 5; i++) {
    yield i * num;

    await Future.delayed(Duration(seconds: 1)); // 시간차를 두기 위해 추가
  }
}
=>
0 // calculate(1) 시작
1
2
3
4
0 // calculate(2) 시작
2
4
6
8

이제 원하는대로 순서대로 실행할 수 있다.

REFERENCE
Asynchronous programming: Streams
[Youtube]40분만에 끝내는 비동기 프로그래밍

반응형

'DEV > Dart' 카테고리의 다른 글

Dart Async programing - Future, await  (0) 2022.06.23
Dart의 형변환 - List, Map, Set  (0) 2022.06.22
Dart의 함수를 알아보자  (0) 2022.06.22
Dart - Null safety & Dart Codelab Exercise  (0) 2022.06.21
Dart 시작하기 - 구조 및 변수  (0) 2022.06.21