通过使用Http,我们调用一个方法来进行网络调用,并返回一个Http可观察对象:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json());
}

如果我们获取这个可观察对象并向其添加多个订阅者:

let network$ = getCustomer();

let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);

我们要做的是确保这不会导致多个网络请求。

这似乎是一个不寻常的场景,但实际上很常见:例如,如果调用者订阅了可观察对象以显示错误消息,并使用异步管道将其传递给模板,那么我们已经有两个订阅者了。

在RxJs 5中正确的方法是什么?

也就是说,这似乎工作得很好:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json()).share();
}

但是这是RxJs 5中惯用的方法吗,或者我们应该用别的方法来代替?

注意:根据Angular 5的新HttpClient,所有示例中的.map(res => res. JSON())部分现在都是无用的,因为现在默认假设JSON结果。


当前回答

你可以简单地使用ngx-cacheable!它更适合你的场景。

使用这个的好处 它只调用rest API一次,缓存响应并为接下来的请求返回相同的响应。 在创建/更新/删除操作后,可以根据需要调用API。

那么,你的服务等级应该是这样的

import { Injectable } from '@angular/core';
import { Cacheable, CacheBuster } from 'ngx-cacheable';

const customerNotifier = new Subject();

@Injectable()
export class customersService {

    // relieves all its caches when any new value is emitted in the stream using notifier
    @Cacheable({
        cacheBusterObserver: customerNotifier,
        async: true
    })
    getCustomer() {
        return this.http.get('/someUrl').map(res => res.json());
    }

    // notifies the observer to refresh the data
    @CacheBuster({
        cacheBusterNotifier: customerNotifier
    })
    addCustomer() {
        // some code
    }

    // notifies the observer to refresh the data
    @CacheBuster({
        cacheBusterNotifier: customerNotifier
    })
    updateCustomer() {
        // some code
    }
}

这里有更多的参考链接。

其他回答

上面的大多数答案都适用于不接受输入的http请求。每次你想要使用一些输入进行api调用时,都需要重新创建请求。上面唯一可以处理这个问题的回复是@Arlo的回复。

我已经创建了一个稍微简单的装饰器,您可以使用它将响应共享给每个具有相同输入的调用者。与Arlo的回复不同,它不会对延迟的订阅者重放响应,而是将同时发生的请求作为一个请求来处理。如果目标是重放响应给延迟的观察者(也就是缓存的响应),你可以修改下面的代码并用shaereplay(1)替换share():

https://gist.github.com/OysteinAmundsen/b97a2359292463feb8c0e2270ed6695a

import { finalize, Observable, share } from 'rxjs';

export function SharedObservable(): MethodDecorator {
  const obs$ = new Map<string, Observable<any>>();
  return (target: any, propertyKey: string | symbol, descriptor: PropertyDescriptor) => {
    const originalMethod = descriptor.value;
    descriptor.value = function (...args: any[]) {
      const key = JSON.stringify(args);
      if (!obs$.has(key)) {
        // We have no observable for this key yet, so we create one
        const res = originalMethod.apply(this, args).pipe(
          share(), // Make the observable hot
          finalize(() => obs$.delete(key)) // Cleanup when observable is complete
        );
        obs$.set(key, res);
      }
      // Return the cached observable
      return obs$.get(key);
    };
    return descriptor;
  };
}

用法:

@SharedObservable()
myFunc(id: number): Observable<any> {
  return this.http.get<any>(`/api/someUrl/${id}`);
}

根据@Cristian的建议,这是一种适用于HTTP可观察对象的方法,它只发射一次,然后就完成了:

getCustomer() {
    return this.http.get('/someUrl')
        .map(res => res.json()).publishLast().refCount();
}

更新:Ben Lesh说在5.2.0之后的下一个小版本中,你将能够调用shareplay()来真正地缓存。

以前……

首先,不要使用share()或publishReplay(1). refcount(),它们是相同的,它的问题是,它只在可观察对象处于活动状态时建立连接时共享,如果你在它完成连接后,它会再次创建一个新的可观察对象,翻译,而不是真正的缓存。

Birowski给出了正确的解决方案,即使用ReplaySubject。ReplaySubject将缓存你给它的值(bufferSize),在我们的例子中是1。它不会像share()一样在refCount为零时创建一个新的可观察对象,并且你建立了一个新的连接,这是缓存的正确行为。

这是一个可重用函数

export function cacheable<T>(o: Observable<T>): Observable<T> {
  let replay = new ReplaySubject<T>(1);
  o.subscribe(
    x => replay.next(x),
    x => replay.error(x),
    () => replay.complete()
  );
  return replay.asObservable();
}

下面是如何使用它

import { Injectable } from '@angular/core';
import { Http } from '@angular/http';
import { Observable } from 'rxjs/Observable';
import { cacheable } from '../utils/rxjs-functions';

@Injectable()
export class SettingsService {
  _cache: Observable<any>;
  constructor(private _http: Http, ) { }

  refresh = () => {
    if (this._cache) {
      return this._cache;
    }
    return this._cache = cacheable<any>(this._http.get('YOUR URL'));
  }
}

下面是一个更高级版本的可缓存函数。这个函数允许有自己的查找表+提供自定义查找表的能力。这样的话,你就不用检查了。_cache就像上面的例子。还要注意的是,你传递的不是可观察对象作为第一个参数,而是一个返回可观察对象的函数,这是因为Angular的Http会立即执行,所以通过返回一个延迟执行的函数,如果它已经在缓存中,我们可以决定不调用它。

let cacheableCache: { [key: string]: Observable<any> } = {};
export function cacheable<T>(returnObservable: () => Observable<T>, key?: string, customCache?: { [key: string]: Observable<T> }): Observable<T> {
  if (!!key && (customCache || cacheableCache)[key]) {
    return (customCache || cacheableCache)[key] as Observable<T>;
  }
  let replay = new ReplaySubject<T>(1);
  returnObservable().subscribe(
    x => replay.next(x),
    x => replay.error(x),
    () => replay.complete()
  );
  let observable = replay.asObservable();
  if (!!key) {
    if (!!customCache) {
      customCache[key] = observable;
    } else {
      cacheableCache[key] = observable;
    }
  }
  return observable;
}

用法:

getData() => cacheable(this._http.get("YOUR URL"), "this is key for my cache")

我把问题打了星号,但我会试着试一下。

//this will be the shared observable that 
//anyone can subscribe to, get the value, 
//but not cause an api request
let customer$ = new Rx.ReplaySubject(1);

getCustomer().subscribe(customer$);

//here's the first subscriber
customer$.subscribe(val => console.log('subscriber 1: ' + val));

//here's the second subscriber
setTimeout(() => {
  customer$.subscribe(val => console.log('subscriber 2: ' + val));  
}, 1000);

function getCustomer() {
  return new Rx.Observable(observer => {
    console.log('api request');
    setTimeout(() => {
      console.log('api response');
      observer.next('customer object');
      observer.complete();
    }, 500);
  });
}

这是证据:)

这里只有一个要点:getCustomer().subscribe(customer$)

我们不是订阅getCustomer()的api响应,我们是订阅一个ReplaySubject,它是可观察的,它也可以订阅一个不同的可观察对象,并且(这很重要)持有它最后发出的值并重新发布给它的任何(ReplaySubject的)订阅者。

RXJS 5.3.0

我对.map(myFunction).publishReplay(1).refCount()不满意

对于多个订阅者,.map()在某些情况下执行myFunction两次(我希望它只执行一次)。一个修复似乎是publishReplay(1).refCount().take(1)

你可以做的另一件事,就是不使用refCount(),让Observable立即热:

let obs = this.http.get('my/data.json').publishReplay(1);
obs.connect();
return obs;

这将启动HTTP请求,而不考虑订阅者。我不确定在HTTP GET完成之前取消订阅是否会取消它。