Giới thiệu về Reactive Programing trong javascript

Có thể hình dung stream như là một array đặc biệt , chứa một tập các phần tử đặc biệt , các phần tử này có thể emit: 1. value, 2. error, 3. complete, các phần tử trong stream cũng không có hết ngay từ đầu, mà sẽ xuất hiện ở một thời điểm ko xác định trong tương lai.

Reactive programming is programming with asynchronous data streams

Có thể thấy ngay Reactive programing khá trừu tượng, nhưng do thay vì implement những ràng buộc một cách chi tiết, những ràng buộc này được gắn vào từng data gửi đi trên stream, code nó sẽ gọn gàng hơn.

Kiểu viết này sẽ mang phong cách declarative hơn là imperative, chúng ta không khai báo từng bước tuần tự cần làm gì, chúng ta chỉ khai báo mối quan hệ giữa các stream với nhau.

10 năm trước, mọi việc chỉ đơn giản là submit toàn bộ giá trị các field lên backend xử lý, rồi đơn thuần hiển thị kết quả trả về, bây giờ user thích real-time feedback, bấm “like” một phát là đầu bên kia thấy được liền.

Những event real-time như thế, user khoái, chúng ta cần có một công cụ lập trình để làm việc đó, Reactive Program ra đời cũng từ yêu cầu của user.

Implement hộp thoại “Who to follow” của twitter

Mình sẽ sử dụng RxJS trong ví dụ, vì mình chỉ biết javascript thôi các bạn.

Tính năng chính của hộp thoại này

  • Vừa mở lên, load data từ API, hiển thị 3 tài khoản

Chúng ta tiếp cận với vấn đề này như thế nào, gần như mọi thứ có thể xem là stream.

Load dữ liệu lúc đầu

Bắt đầu với tính năng đơn giản nhất “Mới vào, load 3 account từ API”. (1) gửi 1 request (2) nhận response (3) render kết quả

Lúc bắt đầu chúng ta chỉ có 1 request, mọi thứ rất đơn giản, yên tâm là nó sẽ phức tạp dần lên khi có nhiều request. Mô phỏng nó như data stream, stream này chỉ có 1 emit value.

Khi có một event request xảy ra, nó báo 2 việc: khi nào và cái gì. Khi nào event này được emit và cái gì chính là value được emit (url string)

Trong Rx, bà con gọi stream là Observable, mình thích gọi là stream hơn

Khi emit value, chúng ta subscribe để thực thi một hành động tiếp theo

// execute the request
jQuery.getJSON(requestUrl, function(responseData) {
// …
});
}

Cái response của request cũng là một dạng stream, dữ liệu sẽ đến tại một thời điểm không xác định trong tương lai

requestStream.subscribe(function(requestUrl) {
  // execute the request
  var responseStream = Rx.Observable.create(function (observer) {
    jQuery.getJSON(requestUrl)
    .done(function(response) { observer.onNext(response); })
    .fail(function(jqXHR, status, error) { observer.onError(error); })
    .always(function() { observer.onCompleted(); });
  });
  
  responseStream.subscribe(function(response) {
    // do something with the response
  });
}

Rx.Observable.create() sẽ tạo ra những stream mới, qua việc thông báo cho các observer đang subscriber các sự kiện onNext()onError().

Nó giống cách chạy của Promise lắm đúng không? Vâng Observable là một dạng Promise++, phiên bản mở rộng.

Chúng ta có 1 subscribe bên trong 1 subscribe khác, nó giống như callback hell. Thêm nữa việc tạo responseStream hoàn toàn độc lập với requestStream. Trong Rx chúng ta có một cách đơn giản để transform và tạo một stream mới từ những thằng khác

Hàm map(f), sẽ lấy từng giá trị của stream A, gọi function f(), và trả về giá trị cho stream B. Tạo một stream này từ stream khác, y như hàm map của array thôi mà.

var responseMetastream = requestStream
  .map(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

Sau đó chúng ta tạo một stream của stream metastream. Bắt đầu phức tạp rồi đó. Metastream là 1 stream mà mỗi cái value được emit sẽ trỏ ra 1 stream khác. Trong ví dụ, mỗi URL request, được trỏ đến một stream promise chứa response

Với responseStream, chúng ta chỉ một đơn giản một stream chứa response, nên việc tạo một metastream cho response sẽ rối và không cần. Mỗi giá trị được emit của response sẽ là một object JSON, không phải một Promise của object JSON. Sử dụng .flatMap() để gộp tất cả response thành 1 stream, .flatMap là operator để xử lý dữ liệu async trong Rx

var responseStream = requestStream
  .flatMap(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

responseStream được khai báo bởi requestStream, nếu sau này có thêm các sự kiện trên requestStream, chúng ta sẽ có một event response tương ứng trên responseStream

Sau khi có được responseStream, chúng ta render thôi

responseStream.subscribe(function(response) {
  // render `response` to the DOM however you wish
});

Toàn bộ bode bây giờ


var responseStream = requestStream
.flatMap(function(requestUrl) {
return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
});

responseStream.subscribe(function(response) {
// render `response` to the DOM however you wish
});

Nút refresh

JSON trả về từ API sẽ có 100 user, nó chỉ cho thêm offset, không cho set page size, chúng ta chỉ cần 3 user, lãng phí hết 97 user. Tạm thời không quan tâm phần này, chúng ta sẽ cache lại cái response sau.

  • cập nhập lại requestStream để nó phụ thuộc vào refreshStream

RxJS có hàm để chuyển event thành stream

var refreshButton = document.querySelector('.refresh');
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
  });

Phải tách stream này ra riêng

.map(function() {
var randomOffset = Math.floor(Math.random()*500);
});

Sau đó mới .merge() lại

vvvvvvvvv merge vvvvvvvvv
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
});


var requestStream = Rx.Observable.merge(
requestOnRefreshStream, startupRequestStream
);

Có cách gọn hơn, không cần đến một stream trung gian

.map(function() {
var randomOffset = Math.floor(Math.random()*500);
})

Thậm chí gọn hơn nữa

.map(function() {
var randomOffset = Math.floor(Math.random()*500);
})
.map(function() {
var randomOffset = Math.floor(Math.random()*500);
});
// clear 3 sugesstion
})

Tuy nhiên, responseStream cũng đang có 1 subscribe ảnh hướng đến việc render, như vậy việc render này cũng tạo thêm 1 stream (có 2 sự kiện emit value để render)

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // get one random user from the list
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  });

Chúng ta cũng sẽ có suggestion2Streamsuggestion3StreamsuggestionNStream hoàn toàn giống với suggestion1Stream, nhưng mình sẽ để các bạn tự suy nghĩ cách giải quyết. Ví dụ này chỉ đề cập đến suggestion1Stream

Thay vì render trên subscribe của responseStream

suggestion1Stream.subscribe(function(suggestion) {
  // render the 1st suggestion to the DOM
});
var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // get one random user from the list
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  })
  .merge(
  );

Với trường hợp null, đơn giản render thông báo

suggestion1Stream.subscribe(function(suggestion) {
  if (suggestion === null) {
    // hide the first suggestion DOM element
  }
  else {
    // show the first suggestion DOM element
    // and render the data
  }
});

Hình dung quá trình này như sau, trong đó N là giá trị null

var close1Button = document.querySelector('.close1');

  .map(function(){
    var randomOffset = Math.floor(Math.random()*500);
  })

Không chạy, nó sẽ remove user và tải mới 3 suggestion luôn. Vì cái API của chúng ta xài nó load 1 lần 100 user, nên giờ chúng ta chỉ lấy các user nào chưa hiển thị luôn, không cần refresh mới.

Suy nghĩ theo hướng stream, khi event close1 xuất hiện, chúng ta lấy emit response mới nhất trên responseStream, rồi lấy ngẫu nhiên 1 user

Operator là combineLatest sẽ nhận vào 2 stream A, B, khi 1 trong 2 stream có emit value, combineLatest sẽ join 2 value emit gần nhất ab rồi trả về c = f(x, y), trong đó f là function chúng ta khai báo

vvvvvvvv combineLatest(f) vvvvvvv
return listUsers[Math.floor(Math.random()*listUsers.length)];
})
.merge(
)
.startWith(null);
return listUsers[Math.floor(Math.random()*listUsers.length)];
}
)
.merge(
)
.startWith(null);

Tổng kết

Toàn bộ code

var refreshButton = document.querySelector('.refresh');

var closeButton1 = document.querySelector('.close1');
// and the same logic for close2 and close3

  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
  });

var responseStream = requestStream
  .flatMap(function (requestUrl) {
    return Rx.Observable.fromPromise($.ajax({url: requestUrl}));
  });

      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
  )
  .startWith(null);
// and the same logic for suggestion2Stream and suggestion3Stream

suggestion1Stream.subscribe(function(suggestion) {
  if (suggestion === null) {
    // hide the first suggestion DOM element
  }
  else {
    // show the first suggestion DOM element
    // and render the data
  }
});