我正试着理清思路。我喜欢可观察对象解决开发和可读性问题的方式。正如我读到的,好处是巨大的。

HTTP和集合上的可观察对象似乎很直接。我怎么把这样的东西转换成可观察的模式。

这来自我的服务组件,用于提供身份验证。我希望它能像Angular2中的其他HTTP服务一样工作——支持数据、错误和完成处理程序。

firebase.auth().createUserWithEmailAndPassword(email, password)
  .then(function(firebaseUser) {
    // do something to update your UI component
    // pass user object to UI component
  })
  .catch(function(error) {
    // Handle Errors here.
    var errorCode = error.code;
    var errorMessage = error.message;
    // ...
  });

如果有任何帮助,我将不胜感激。我拥有的唯一替代解决方案是创建eventemitter。但我想在服务部门这样做是很糟糕的


试试这个:

import 'rxjs/add/observable/fromPromise';
import { Observable } from "rxjs/Observable";

const subscription = Observable.fromPromise(
    firebase.auth().createUserWithEmailAndPassword(email, password)
);
subscription.subscribe(firebaseUser => /* Do anything with data received */,
                       error => /* Handle error here */);

你可以在这里找到fromPromise操作符的完整引用。

如果你使用的是RxJS 6.0.0:

import { from } from 'rxjs';
const observable = from(promise);

您还可以使用Subject并从promise触发它的next()函数。请看下面的例子:

添加如下代码(我使用的是service)

class UserService { private createUserSubject: Subject < any > ; createUserWithEmailAndPassword() { if (this.createUserSubject) { return this.createUserSubject; } else { this.createUserSubject = new Subject < any > (); firebase.auth().createUserWithEmailAndPassword(email, password) .then(function(firebaseUser) { // do something to update your UI component // pass user object to UI component this.createUserSubject.next(firebaseUser); }) .catch(function(error) { // Handle Errors here. var errorCode = error.code; var errorMessage = error.message; this.createUserSubject.error(error); // ... }); } } }

从组件创建用户,如下所示

UserComponent { 构造函数(私有userService: userService) { this.userService.createUserWithEmailAndPassword()。订阅(用户=> console.log(用户),错误=> console.log(错误); } }

直接执行/转换

使用from直接将先前创建的Promise转换为可观察对象。

import { from } from 'rxjs';

// getPromise() is called once, the promise is passed to the Observable
const observable$ = from(getPromise());

observable$将是一个热observable,有效地向订阅者重放Promises值。

它是一个热门可观察对象,因为生产者(在这个例子中是Promise)是在可观察对象之外创建的。多个订阅者将共享相同的承诺。如果内部Promise已经被解析,Observable的新订阅者将立即获得它的值。

2每个订阅的延迟执行

使用deferred和Promise工厂函数一起作为输入,将Promise的创建和转换延迟到可观察对象。

import { defer } from 'rxjs';

// getPromise() is called every time someone subscribes to the observable$
const observable$ = defer(() => getPromise());

observable$将是一个冷的observable。

它是一个冷可观察对象,因为生产者(Promise)是在可观察对象内部创建的。每个订阅者将通过调用给定的Promise工厂函数来创建一个新的Promise。

这允许你创建一个可观察对象$,而无需立即创建并执行Promise,也无需与多个订阅者共享这个Promise。 每个可观察对象$的订阅者有效地调用from(promiseFactory()).subscribe(subscriber)。所以每个订阅者创建并转换自己的新Promise到一个新的Observable,并将自己附加到这个新的Observable上。

许多运营商直接接受承诺

大多数组合(例如merge, concat, forkJoin, combineLatest…)或转换可观察对象(例如switchMap, mergeMap, concatMap, catchError…)的RxJS操作符直接接受promise。如果你正在使用它们中的一个,你不需要首先使用from来包装一个承诺(但要创建一个冷的可观察对象,你仍然可能需要使用defer)。

// Execute two promises simultaneously
forkJoin(getPromise(1), getPromise(2)).pipe(
  switchMap(([v1, v2]) => v1.getPromise(v2)) // map to nested Promise
)

检查文档或实现,看看你使用的操作符是否接受ObservableInput或SubscribableOrPromise。

type ObservableInput<T> = SubscribableOrPromise<T> | ArrayLike<T> | Iterable<T>;
// Note the PromiseLike ----------------------------------------------------v
type SubscribableOrPromise<T> = Subscribable<T> | Subscribable<never> | PromiseLike<T> | InteropObservable<T>;

示例中from和defer的区别:https://stackblitz.com/edit/rxjs-6rb7vf

const getPromise = val => new Promise(resolve => {
  console.log('Promise created for', val);
  setTimeout(() => resolve(`Promise Resolved: ${val}`), 5000);
});

// the execution of getPromise('FROM') starts here, when you create the promise inside from
const fromPromise$ = from(getPromise('FROM'));
const deferPromise$ = defer(() => getPromise('DEFER'));

fromPromise$.subscribe(console.log);
// the execution of getPromise('DEFER') starts here, when you subscribe to deferPromise$
deferPromise$.subscribe(console.log);

defer可能是大多数人都在寻找的操作符,因为许多应用程序依赖于Observables是冷的,并在订阅时触发数据获取。from在某些情况下仍然是一个可行的选择,例如,当你想在某个初始化过程中创建一个Promise,然后通过一个将被多次订阅的可观察对象传播它的值,但不想为每个订阅者再次创建和执行Promise。

你也可以使用defer。主要的区别是,承诺不会立即解决或拒绝。

你可以为promise功能添加包装器,将Observable返回给观察者。

使用defer()操作符创建一个Lazy Observable,它允许你只在观察者订阅时创建Observable。

import { of, Observable, defer } from 'rxjs'; 
import { map } from 'rxjs/operators';


function getTodos$(): Observable<any> {
  return defer(()=>{
    return fetch('https://jsonplaceholder.typicode.com/todos/1')
      .then(response => response.json())
      .then(json => {
        return json;
      })
  });
}

getTodos$().
 subscribe(
   (next)=>{
     console.log('Data is:', next);
   }
)

import { from } from 'rxjs';

from(firebase.auth().createUserWithEmailAndPassword(email, password))
.subscribe((user: any) => {
      console.log('test');
});

下面是一个简短的版本,结合了上面的一些答案,将你的代码从承诺转换为可观察对象。

Rxjs提供了toPromise()运算符, 如代码示例所示:

@Injectable({
  providedIn: 'root'
})
export class InventoryService {
  constructor(private httpClient: HttpClient) {}

  getCategories(): Observable<Category[]> {
    const url = 'https://www.themealdb.com/api/json/v1/1/categories.php';

    return this.httpClient.get<CategoriesResponse>(url).pipe(
      map(response => response.categories)
    );
  }
}

在你的组件中,你可以应用toPromise()操作符:

export class AppComponent {
  categories: any[];

  constructor(private inventoryService: InventoryService) {}

  public async loadCategories() {
    this.categories = await this.inventoryService
      .getCategories()
      .**toPromise()**

但目前Rxjs7+已弃用,建议使用lastValueFrom()操作符:

  public async loadCategories() {
    const categories$ = this.inventoryService.getCategories();
    this.categories = await **lastValueFrom**(categories$);
  }

我希望它对更新版本的更新代码有所帮助:')

将promise转换为可观察对象的正确模式是使用defer和from操作符:

import { defer, from } from 'rxjs';
    
const observable$ = defer(() => from(myPromise()));

为什么我们需要延迟运算符?

承诺是热切的,这意味着一旦被召唤,它们就会立即开火。这与可观察对象的工作原理相反。可观察对象是懒惰的,它们只在调用.subscribe()时被触发。这就是为什么我们总是需要把它包装成一个延迟运算符的原因。from操作符不做这项工作,所以总是需要defer。