[Day 48] Reactive Programming
RxSwift를 사용하여 Reactive Rrogramming하기! Part.2
📌 Observable
Observable
은 시간에 따라 발생하는 이벤트 또는 변경되는 데이터를 스트림(Stream)형태로 전달하는 객체에요!
Observable
은 크게 ❄️ Cold Observable과 🔥 HotObservable로 나뉘어 지는데 이 두개의 차이점을 알아볼까요?
❄️ Cold Observable | 🔥 HotObservable | |
---|---|---|
이벤트 시작 시점 | 구독 시작시 스트림 진행 | 생성 시점부터 스트림 진행 |
값 공유 | 구독자마다 독립적인 스트림 | 구독자 모두 동일한 이벤트 스트림 |
♻️ Observable Life Cycle
Observable
은 생성되고, next이벤트를 방출
하다 complete 이벤트
또는 errror 이벤트
를 발생시키고 종료
Subscribe시 전달되는 Disposable객체의 dispose()함수 호출!!
Create Observable
Observable을 생성 하는 방법을 알아보자!
1
2
3
4
5
6
7
8
Observable<String>.create { observer in
observer.on(.next("🐶"))
observer.on(.next("🐭"))
observer.on(.next("🐼"))
observer.on(.next("🐯"))
observer.on(.completed())
return Disposables.create()
}
Observable.create
는 개발자가 직접 이벤트 흐름을 정의할 수 있는 방법이에요!
이 예제에서는 문자열 타입의 Observable
을 생성하고 있으며, observer.on(.next(...))
를 사용해 각 이모지 값을 차례로 방출한 뒤 .completed()
이벤트로 스트림을 종료합니다.
마지막으로 Disposables.create()
는 정리 작업이 필요할 때 사용하는 클린업 로직이랍니다.
1
2
3
4
5
6
7
8
9
Observable<UIImage>.create { observer in
let imageTask = fetchImage(url: url) {
observer.on(.next($0))
observer.on(.completed)
}
return Disposables.create {
imageTask.cancel()
}
}
이렇게 비동기 네트워크 작업을 응용할 수 있는데,
fetchImage(url:)
처럼 비동기로 동작하는 이미지 다운로드 함수를 Observable<UIImage>
로 감싸서 사용자가 이미지 다운로드 흐름을 Rx 방식으로 구독하고 반응형으로 다룰 수 있어요.
다운로드가 완료되면 .next
를 통해 이미지를 전달하고, 작업이 끝나면 .completed
이벤트로 스트림을 종료합니다.
마지막 Disposables.create
내부에서 imageTask.cancel()
을 호출하면, 구독이 dispose
될 때 다운로드 작업을 중단할 수 있어 네트워크 리소스를 아낄 수 있어요.
앞에서는 Observable.create를 사용해서 커스텀 Observable을 직접 만들었는데, 매번 그렇게 만들기엔 번거로운 경우가 많아요.
그래서 RxSwift
에서는 자주 쓰이는 패턴에 맞춰 쉽게 Observable을 만들 수 있는 생성 연산자들을 제공합니다. 대표적인 연산자들을 소개해줄게요.
- just: 하나의 요소를 방출하고 .completed 이벤트로 스트림을 종료
1 2
Observable .just("🐶")
- of: 여러 개의 요소를 순서대로 방출하고 .completed 이벤트로 스트림을 종료
1 2
Observable .of("🐶", "🐭", "🐼", "🐯")
- from: Sequence의 각 요소를 순서대로 방출하고 .completed 이벤트로 스트림을 종료
1 2
Observable .from(["🐶", "🐭", "🐼", "🐯"])
- interval: 정해진 시간간격 마다 무한히 정수값 방출
1 2 3 4 5
Observable<Int> .interval( .seconds(1), scheduler: MainScheduler.instance )
- empty: 아무것도 방출하지 않고 .completed 이벤트로 스트림을 종료
1
Observable<String>.empty()
- never: 아무 이벤트도 발생시키지 않음
1
Observable<String>.never()
- error: 에러 이벤트를 방출
1 2
Observable<String> .error(MyError.oops)
이처럼 RxSwift는 다양한 상황에 맞는 Observable 생성 도구를 제공해주기 때문에, 상황에 따라 적절한 연산자를 선택해서 더 간결하고 선언적인 코드를 작성할 수 있어요.
subscribe Observable
앞에서 살펴본 Observable
은 단순히 이벤트 스트림을 정의
한 것뿐이에요.
하지만 실제로 이 스트림이 동작하려면, 누군가 이걸 구독
해야 해요.
그래서 사용하는 것이 바로 subscribe()
입니다.
subscribe()에는 내부적으로 4가지 이벤트 핸들러가 있는데 | 이벤트 | 설명 | | — | — | | onNext | 새로운 데이터가 방출될 때 실행 | | onCompleted | 모든 데이터 방출이 끝났을 때 실행 | | onError | 에러가 발생했을 때 실행 | | onDisposed | 구족이 종료되었을 때 실행 |
이 메서드를 호출해야만 Observable이 내부 이벤트를 실제로 실행하고, 우리는 그 값을 수신(onNext) 할 수 있게 됩니다.
onNext
: Observable에서 새로운 값이 방출될 때마다 호출 돼요.1 2 3 4
Observable.of("1", "2", "3") .subscribe(onNext: { number in print(number) })
이렇게 구독하면, “1”, “2”, “3” 순서로 출력되고 .completed 이벤트로 스트림이 종료돼요
onCompleted
: Observable이 모든 이벤트를 정상적으로 방출하고 나서 호출돼요.1 2 3 4 5
Observable.of("1", "2") .subscribe( onNext: { print($0) }, onCompleted: { print("완료!") } )
“1”, “2” 방출된 후 완료! 문자열을 출력
onError
: Observable 수행중 에러가 발생하면 호출돼요1 2 3 4 5 6 7
enum MyError: Error { case failed } Observable<String>.error(MyError.failed) .subscribe( onNext: { print($0) }, onError: { error in print("에러 발생: \(error)") } )
에러 발생: failed
가 출력onDisposed
: Observable이 dispose 되었을 때 호출돼요. (완료되었든, 에러가 났든, 수동으로 dispose 되었든 무조건 실행)1 2 3 4 5 6 7 8
let disposeBag = DisposeBag() Observable.of("1", "2") .subscribe( onNext: { print($0) }, onDisposed: { print("구독 종료") } ) .disposed(by: disposeBag)
“1”, “2” 방출 후 구독종료 문자열을 출력
📌 Operators
오퍼레이터(Operator)는 Observable에서 나오는 데이터 스트림을 변형하거나 필터링하는 도구들이에요.
쉽게말하면…
Observable이 데이터를 방출할 때
→ 그 데이터에 연산을 적용하거나
→ 필요한 것만 필터링하거나
→ 여러 스트림을 합치거나 하는 역할!
Operator에는 여러가지가 있어요
- 변환 (Transforming)
- 필터링 (Filtering)
- 결합 (Combining)
- 오류 (Error)
- 공유 (Sharing)
이렇게 있는데 하나하나씩 알아봐요
변환 (Transforming)
map
: 방출된 값을 원하는 형태로 변환할 때 사용해요.1 2 3
Observable<Int> .interval(.seconds(1)) .map { $0 * 10}
flatMap
: 각 이벤트에 대해 새로운 Observable을 생성하고, 그 모든 결과를 합쳐서 방출해요.1 2 3 4 5 6 7
let interval = Observable<Int> .interval(.seconds(1)) let observable = Observable .of(10, 20) interval .flatMap { _ in observable }
flatMapFirst
: 첫 번째 내부 Observable만 구독하고, 이전 작업이 끝날 때까지 다음 요청은 무시합니다.1 2 3 4 5 6 7
let interval = Observable<Int> .interval(.seconds(1)) let observable = Observable .of(10, 20, 30) interval .flatMapFirst { _ in observable }
flatMapLatest
: 가장 최근에 생성된 Observable만 구독하고, 이전 Observable은 무시합니다.1 2 3 4 5 6 7
let interval = Observable<Int> .interval(.seconds(1)) let observable = Observable .of(10, 20, 30) interval .flatMapLatest { _ in observable }
scan
: 이전 값을 누적하면서 매번 결과를 방출해요.1 2 3
Observable<Int> .interval(.seconds(1)) .scan(0, accumulator: { $0 + $1 })
reduce
: 모든 요소를 누적하여 최종 결과 한 번만 방출합니다.1 2
Observable.of(1, 2, 3, 4) .reduce(0, accumulator: +)
startWith
: Observable 앞에 기본값을 추가로 방출합니다.1 2
Observable.of(1, 2, 3, 4) .startWith(0)
💡 Tip
flatMapFirst
→ 오래 걸리는 작업이 중복 실행되지 않도록flatMapLatest
→ 가장 최신 요청만 반영 (이전 요청 무시)scan
→ 이벤트가 들어올 때마다 실시간 누적 (중간값 추적)reduce
→ 모든 이벤트를 모아서 최종 누적 결과만 방출
필터링 (Filtering)
filter
: 조건에 맞는 값만 통과시켜서 방출합니다.1 2 3
Observable<Int> .interval(.seconds(1)) .filter { $0.isMultiple(of: 2) }
distinctUntilChanged
: 이전 값과 같은 값은 무시하고, 바뀌는 값만 방출합니다.1 2
Observable.of(1, 1, 2, 3, 3) .distinctUntilChanged()
element
: 지정한 인덱스 위치의 값 하나만 방출합니다.1 2 3
Observable<Int> .interval(.seconds(1)) .element(at: 3)
take
: 앞에서부터 지정한 개수만큼 방출하고 종료합니다.1 2 3
Observable<Int> .interval(.seconds(1)) .take(3)
take(until)
: 다른 Observable이 이벤트를 방출하기 전까지 값들을 방출합니다.1 2 3 4 5
let numbers = Observable.create... let letters = Observable.create... Observable .take(until: letters)
skip
: 앞의 값들을 지정한 개수만큼 건너뛰고 이후 값부터 방출합니다.1 2
Observable<Int> .skip(3)
skip(until)
: 다른 Observable이 emit하기 전까지는 무시, emit된 순간부터 시작합니다.1 2 3 4 5
let numbers = Observable.create... let letters = Observable.create... Observable .skip(until: letters)
debounce
: 마지막 이벤트 후 일정 시간이 지나야 그 값을 방출합니다.1 2 3 4
let numbers = Observable.create... numbers .debounce(.seconds(2))
throttle
: 지정한 시간 간격마다 최신 값을 방출하거나 무시합니다.1 2 3 4
let numbers = Observable.create... numbers .throttle(.seconds(2))
throttle(latest)
: 설정 시간 내에서 가장 처음 들어온 값만 방출하고 이후는 무시합니다.1 2 3 4
let numbers = Observable.create... numbers .throttle(.seconds(2), latest: false)
💡 Tip
filter
→ 조건에 맞는 값만 선별 (ex. 짝수만!)distinctUntilChanged
→ 연속 중복 제거 (UI 상태 중복 방지에 유용)element(at:)
→ 특정 위치의 값 딱 한 번만 받기take(_:)
→ 앞에서부터 정해진 개수만 받고 스트림 종료take(until:)
→ 다른 Observable이 막을 때까지 받기 (ex. 버튼 누르면 중단)skip(_:)
→ 초반 값은 건너뛰고 시작skip(until:)
→ 신호 Observable이 emit될 때까지 기다림debounce
→ 입력이 끝나고 잠시 후 마지막 값만 받기 (검색창에 최고!)throttle
→ 지정 시간 간격마다 반응 (중복 클릭 방지)throttle(..., latest: false)
→ 간격 내 첫 값만 방출 (가장 빨리 누른 것만 인정)
결합 (Combining)
zip
: 두 Observable의 요소를 순서대로 쌍(pair)으로 묶어서 방출해요.1 2 3 4
let numbers = Observable.create... let letters = Observable.create... Observable.zip(numbers, letters)
combineLatest
: 각 Observable이 최신 값을 가지고 있을 때마다, 가장 최신 값들로 조합해서 방출해요.1 2 3 4 5
let numbers = Observable.create... let letters = Observable.create... Observable .combineLatest(numbers, letters)
withLatestFrom
: 기준 Observable이 emit될 때, 다른 Observable의 최신 값을 가져와서 방출해요.1 2 3 4 5
let numbers = Observable.create... let letters = Observable.create... numbers .withLatestFrom(letters)
merge
: 여러 Observable을 하나로 합쳐서 이벤트를 순서 상관없이 방출해요.1 2 3 4
let numbers = Observable.create... let letters = Observable.create... Observable.merge(numbers, letters)
concat
: Observable을 순차적으로 이어붙여서 방출해요.1 2 3 4
let numbers = Observable.create... let letters = Observable.create... Observable.concat(numbers, letters)
💡 Tip
zip
→ 쌍(pair)으로 묶어서 정확히 한 쌍씩 처리combineLatest
→ 각각의 최신 상태를 항상 함께 반영withLatestFrom
→ 기준 트리거로 최신 값만 가져오기 (예: 버튼 클릭)merge
→ 실시간으로 다양한 소스를 동시에 처리concat
→ 순서 보장하며 하나 끝나고 다음 시작 (직렬 처리)
오류 (Error)
retry
: 에러가 발생하면 Observable의 스트림을 처음부터 다시 실행해요.1 2
let numbers = Observable.create... numbers.retry()
catchAndReterun
: 에러가 발생했을 때 지정된 기본값을 방출하고 스트림을 종료해요.1 2
let numbers = Observable.create... numbers.catchAndReturn(0)
catch
: 에러가 발생하면 다른 Observable로 대체해서 계속 스트림을 이어가요.1 2
let numbers = Observable.create... numbers.catch { _ in newObservable}
💡 Tip
retry
→ 실패한 작업을 다시 시도할 때 (ex. 네트워크)catchAndReturn
→ 에러 발생 시 기본값 하나로 대체catch
→ 에러 발생 시 Observable 자체를 바꿔서 계속 진행
공유 (Sharing)
share
: 하나의 observable을 여러 구독자(Subscriber)가 공유할 수 있게 해주는 연산자1 2 3 4 5 6 7 8 9
let interval = Observable<Int> .interval(.seconds(1)) .share(reply: 1) interval //1번 .subscribe() interval //2번 .subscribe()
이 친구(share(replay:)
)는 설명이 조금 필요해요!
interval
처럼 시간이 흐르면서 데이터를 방출하는 Observable
이 있을 때, share(replay: 1)
을 붙이면 최근에 방출한 이벤트 1개를 버퍼에 저장
해두고 다른 구독자가 나중에 구독해도 그 값을 바로 받을 수 있게 해줘요.
예를 들어, 첫 번째 구독자가 1, 2, 3까지 받았고, 3.5초 뒤에 두 번째 구독자가 구독을 시작했다고 하면…
- replay(1) 덕분에 최근 값 3을 바로 받고
- 그 뒤로는 4, 5, 6… 을 첫 번째 구독자와 함께 공유해서 받아요.
즉, 같은 스트림을 중복 실행하지 않고, 최근 값도 보존하면서 새로운 구독자도 자연스럽게 참여할 수 있게 해주는 거죠!
- 💡 Tip
share
→ 공통된 리소스(네트워크, 타이머 등)를 효율적으로 공유할 때 사용해요.replay
→ 파라미터를 통해 이전에 방출된 값을 새 구독자에게 재전달할 수도 있어요.- 구독자가 없을 땐 자동으로 해제되므로 리소스 낭비도 방지 가능!
기타 주요 연산자
debug
: 어떤 이벤트가 어떻게 흐르는지 로그로 출력해줘요!Observable<Int>.interval(.seconds(1)) .debug("identifier", trimOutput: true)
identifier
이름을 붙여서 출력할 수 있고,trimOutput: true
로 하면 콘솔로그를 간결하게 보여줘요.do
: 스트림에 영향을 주지 않고, 중간에 이벤트를 감지하거나 부수작업(사이드이펙트)을 할 수 있어요.1 2 3 4 5 6 7 8 9 10 11
.do( onNext: { element in }, afterNext: { element in }, onError: { error in }, afterError: { error in }, onCompleted: { }, afterCompleted: { }, onSubscribe: { }, onSubscribed: { }, onDispose: { } )
- 💡 Tip
- debug → 스트림 흐름을 전체적으로 추적하고 싶을 때(모든 이벤트로그를 출력)
- debug(“id”, trimOutput: true) → 로그 구분하고 싶을 때
- do → 중간 로그, 애니메이션, 사이드이펙트, 로딩 상태 처리 등 (값은 그대로 통과)
맺으며: 춘식이와 함께하는 스마트한 개발
Part.3에 이어서~
오늘도 오류 내며 자란 춘식이였습니다. 🐾 더 열심히 노력 해서 좋은 iOS Developer가 되자!
혹시 이 글을 보시는 분들 중 더 유용한 팁이 있다면 댓글로 많이 알려주세요!