通过使用Http,我们调用一个方法来进行网络调用,并返回一个Http可观察对象:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json());
}

如果我们获取这个可观察对象并向其添加多个订阅者:

let network$ = getCustomer();

let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);

我们要做的是确保这不会导致多个网络请求。

这似乎是一个不寻常的场景,但实际上很常见:例如,如果调用者订阅了可观察对象以显示错误消息,并使用异步管道将其传递给模板,那么我们已经有两个订阅者了。

在RxJs 5中正确的方法是什么?

也就是说,这似乎工作得很好:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json()).share();
}

但是这是RxJs 5中惯用的方法吗,或者我们应该用别的方法来代替?

注意:根据Angular 5的新HttpClient,所有示例中的.map(res => res. JSON())部分现在都是无用的,因为现在默认假设JSON结果。


当前回答

上面的大多数答案都适用于不接受输入的http请求。每次你想要使用一些输入进行api调用时,都需要重新创建请求。上面唯一可以处理这个问题的回复是@Arlo的回复。

我已经创建了一个稍微简单的装饰器,您可以使用它将响应共享给每个具有相同输入的调用者。与Arlo的回复不同,它不会对延迟的订阅者重放响应,而是将同时发生的请求作为一个请求来处理。如果目标是重放响应给延迟的观察者(也就是缓存的响应),你可以修改下面的代码并用shaereplay(1)替换share():

https://gist.github.com/OysteinAmundsen/b97a2359292463feb8c0e2270ed6695a

import { finalize, Observable, share } from 'rxjs';

export function SharedObservable(): MethodDecorator {
  const obs$ = new Map<string, Observable<any>>();
  return (target: any, propertyKey: string | symbol, descriptor: PropertyDescriptor) => {
    const originalMethod = descriptor.value;
    descriptor.value = function (...args: any[]) {
      const key = JSON.stringify(args);
      if (!obs$.has(key)) {
        // We have no observable for this key yet, so we create one
        const res = originalMethod.apply(this, args).pipe(
          share(), // Make the observable hot
          finalize(() => obs$.delete(key)) // Cleanup when observable is complete
        );
        obs$.set(key, res);
      }
      // Return the cached observable
      return obs$.get(key);
    };
    return descriptor;
  };
}

用法:

@SharedObservable()
myFunc(id: number): Observable<any> {
  return this.http.get<any>(`/api/someUrl/${id}`);
}

其他回答

更新:Ben Lesh说在5.2.0之后的下一个小版本中,你将能够调用shareplay()来真正地缓存。

以前……

首先,不要使用share()或publishReplay(1). refcount(),它们是相同的,它的问题是,它只在可观察对象处于活动状态时建立连接时共享,如果你在它完成连接后,它会再次创建一个新的可观察对象,翻译,而不是真正的缓存。

Birowski给出了正确的解决方案,即使用ReplaySubject。ReplaySubject将缓存你给它的值(bufferSize),在我们的例子中是1。它不会像share()一样在refCount为零时创建一个新的可观察对象,并且你建立了一个新的连接,这是缓存的正确行为。

这是一个可重用函数

export function cacheable<T>(o: Observable<T>): Observable<T> {
  let replay = new ReplaySubject<T>(1);
  o.subscribe(
    x => replay.next(x),
    x => replay.error(x),
    () => replay.complete()
  );
  return replay.asObservable();
}

下面是如何使用它

import { Injectable } from '@angular/core';
import { Http } from '@angular/http';
import { Observable } from 'rxjs/Observable';
import { cacheable } from '../utils/rxjs-functions';

@Injectable()
export class SettingsService {
  _cache: Observable<any>;
  constructor(private _http: Http, ) { }

  refresh = () => {
    if (this._cache) {
      return this._cache;
    }
    return this._cache = cacheable<any>(this._http.get('YOUR URL'));
  }
}

下面是一个更高级版本的可缓存函数。这个函数允许有自己的查找表+提供自定义查找表的能力。这样的话,你就不用检查了。_cache就像上面的例子。还要注意的是,你传递的不是可观察对象作为第一个参数,而是一个返回可观察对象的函数,这是因为Angular的Http会立即执行,所以通过返回一个延迟执行的函数,如果它已经在缓存中,我们可以决定不调用它。

let cacheableCache: { [key: string]: Observable<any> } = {};
export function cacheable<T>(returnObservable: () => Observable<T>, key?: string, customCache?: { [key: string]: Observable<T> }): Observable<T> {
  if (!!key && (customCache || cacheableCache)[key]) {
    return (customCache || cacheableCache)[key] as Observable<T>;
  }
  let replay = new ReplaySubject<T>(1);
  returnObservable().subscribe(
    x => replay.next(x),
    x => replay.error(x),
    () => replay.complete()
  );
  let observable = replay.asObservable();
  if (!!key) {
    if (!!customCache) {
      customCache[key] = observable;
    } else {
      cacheableCache[key] = observable;
    }
  }
  return observable;
}

用法:

getData() => cacheable(this._http.get("YOUR URL"), "this is key for my cache")

我写了一个缓存类,

/**
 * Caches results returned from given fetcher callback for given key,
 * up to maxItems results, deletes the oldest results when full (FIFO).
 */
export class StaticCache
{
    static cachedData: Map<string, any> = new Map<string, any>();
    static maxItems: number = 400;

    static get(key: string){
        return this.cachedData.get(key);
    }

    static getOrFetch(key: string, fetcher: (string) => any): any {
        let value = this.cachedData.get(key);

        if (value != null){
            console.log("Cache HIT! (fetcher)");
            return value;
        }

        console.log("Cache MISS... (fetcher)");
        value = fetcher(key);
        this.add(key, value);
        return value;
    }

    static add(key, value){
        this.cachedData.set(key, value);
        this.deleteOverflowing();
    }

    static deleteOverflowing(): void {
        if (this.cachedData.size > this.maxItems) {
            this.deleteOldest(this.cachedData.size - this.maxItems);
        }
    }

    /// A Map object iterates its elements in insertion order — a for...of loop returns an array of [key, value] for each iteration.
    /// However that seems not to work. Trying with forEach.
    static deleteOldest(howMany: number): void {
        //console.debug("Deleting oldest " + howMany + " of " + this.cachedData.size);
        let iterKeys = this.cachedData.keys();
        let item: IteratorResult<string>;
        while (howMany-- > 0 && (item = iterKeys.next(), !item.done)){
            //console.debug("    Deleting: " + item.value);
            this.cachedData.delete(item.value); // Deleting while iterating should be ok in JS.
        }
    }

    static clear(): void {
        this.cachedData = new Map<string, any>();
    }

}

由于我们使用它的方式不同,所以它都是静态的,但是可以随意地将它变成一个正常的类和服务。我不确定angular是否会一直保持一个实例(对于Angular2来说是新的)。

我是这样使用它的:

            let httpService: Http = this.http;
            function fetcher(url: string): Observable<any> {
                console.log("    Fetching URL: " + url);
                return httpService.get(url).map((response: Response) => {
                    if (!response) return null;
                    if (typeof response.json() !== "array")
                        throw new Error("Graph REST should return an array of vertices.");
                    let items: any[] = graphService.fromJSONarray(response.json(), httpService);
                    return array ? items : items[0];
                });
            }

            // If data is a link, return a result of a service call.
            if (this.data[verticesLabel][name]["link"] || this.data[verticesLabel][name]["_type"] == "link")
            {
                // Make an HTTP call.
                let url = this.data[verticesLabel][name]["link"];
                let cachedObservable: Observable<any> = StaticCache.getOrFetch(url, fetcher);
                if (!cachedObservable)
                    throw new Error("Failed loading link: " + url);
                return cachedObservable;
            }

我认为可能有更聪明的方法,使用一些可观察的技巧,但这对我的目的来说已经很好了。

您是否试过运行已经拥有的代码?

因为你是根据getJSON()的承诺构造Observable,所以网络请求是在任何人订阅之前发出的。最终的承诺由所有订阅者共享。

var promise = jQuery.getJSON(requestUrl); // network call is executed now
var o = Rx.Observable.fromPromise(promise); // just wraps it in an observable
o.subscribe(...); // does not trigger network call
o.subscribe(...); // does not trigger network call
// ...

你可以构建一个简单的类Cacheable<>来帮助管理从多个订阅者的http服务器检索到的数据:

declare type GetDataHandler<T> = () => Observable<T>;

export class Cacheable<T> {

    protected data: T;
    protected subjectData: Subject<T>;
    protected observableData: Observable<T>;
    public getHandler: GetDataHandler<T>;

    constructor() {
      this.subjectData = new ReplaySubject(1);
      this.observableData = this.subjectData.asObservable();
    }

    public getData(): Observable<T> {
      if (!this.getHandler) {
        throw new Error("getHandler is not defined");
      }
      if (!this.data) {
        this.getHandler().map((r: T) => {
          this.data = r;
          return r;
        }).subscribe(
          result => this.subjectData.next(result),
          err => this.subjectData.error(err)
        );
      }
      return this.observableData;
    }

    public resetCache(): void {
      this.data = null;
    }

    public refresh(): void {
      this.resetCache();
      this.getData();
    }

}

使用

声明Cacheable<>对象(假设是服务的一部分):

list: Cacheable<string> = new Cacheable<string>();

和处理程序:

this.list.getHandler = () => {
// get data from server
return this.http.get(url)
.map((r: Response) => r.json() as string[]);
}

从组件调用:

//gets data from server
List.getData().subscribe(…)

您可以有多个组件订阅到它。

更多细节和代码示例在这里:http://devinstance.net/articles/20171021/rxjs-cacheable

只需使用这个缓存层,它就可以完成您需要的一切,甚至还可以管理ajax请求的缓存。

http://www.ravinderpayal.com/blogs/12Jan2017-Ajax-Cache-Mangement-Angular2-Service.html

用起来就这么简单

@Component({
    selector: 'home',
    templateUrl: './html/home.component.html',
    styleUrls: ['./css/home.component.css'],
})
export class HomeComponent {
    constructor(AjaxService:AjaxService){
        AjaxService.postCache("/api/home/articles").subscribe(values=>{console.log(values);this.articles=values;});
    }

    articles={1:[{data:[{title:"first",sort_text:"description"},{title:"second",sort_text:"description"}],type:"Open Source Works"}]};
}

层(作为一个可注入的angular服务)是

import { Injectable }     from '@angular/core';
import { Http, Response} from '@angular/http';
import { Observable }     from 'rxjs/Observable';
import './../rxjs/operator'
@Injectable()
export class AjaxService {
    public data:Object={};
    /*
    private dataObservable:Observable<boolean>;
     */
    private dataObserver:Array<any>=[];
    private loading:Object={};
    private links:Object={};
    counter:number=-1;
    constructor (private http: Http) {
    }
    private loadPostCache(link:string){
     if(!this.loading[link]){
               this.loading[link]=true;
               this.links[link].forEach(a=>this.dataObserver[a].next(false));
               this.http.get(link)
                   .map(this.setValue)
                   .catch(this.handleError).subscribe(
                   values => {
                       this.data[link] = values;
                       delete this.loading[link];
                       this.links[link].forEach(a=>this.dataObserver[a].next(false));
                   },
                   error => {
                       delete this.loading[link];
                   }
               );
           }
    }

    private setValue(res: Response) {
        return res.json() || { };
    }

    private handleError (error: Response | any) {
        // In a real world app, we might use a remote logging infrastructure
        let errMsg: string;
        if (error instanceof Response) {
            const body = error.json() || '';
            const err = body.error || JSON.stringify(body);
            errMsg = `${error.status} - ${error.statusText || ''} ${err}`;
        } else {
            errMsg = error.message ? error.message : error.toString();
        }
        console.error(errMsg);
        return Observable.throw(errMsg);
    }

    postCache(link:string): Observable<Object>{

         return Observable.create(observer=> {
             if(this.data.hasOwnProperty(link)){
                 observer.next(this.data[link]);
             }
             else{
                 let _observable=Observable.create(_observer=>{
                     this.counter=this.counter+1;
                     this.dataObserver[this.counter]=_observer;
                     this.links.hasOwnProperty(link)?this.links[link].push(this.counter):(this.links[link]=[this.counter]);
                     _observer.next(false);
                 });
                 this.loadPostCache(link);
                 _observable.subscribe(status=>{
                     if(status){
                         observer.next(this.data[link]);
                     }
                     }
                 );
             }
            });
        }
}