diff --git a/app/javascript/components/SocketProvider/SocketProvider.js b/app/javascript/components/SocketProvider/SocketProvider.js index 7c9029c..02cbabb 100644 --- a/app/javascript/components/SocketProvider/SocketProvider.js +++ b/app/javascript/components/SocketProvider/SocketProvider.js @@ -1,11 +1,17 @@ /* global App:false */ import { Component } from 'react'; +import makeSubject from 'callbag-subject'; +import pipe from 'callbag-pipe'; +import map from 'callbag-map'; +import subscribe from 'callbag-subscribe'; + export class SocketProvider extends Component { constructor( props ) { super( props ); this.state = {}; + this.series = makeSubject(); this.publish = this.publish.bind( this ); } @@ -15,6 +21,29 @@ export class SocketProvider extends Component { } componentDidMount() { + this.pipeline = pipe( + this.series, + sample( 500 ), + map( ({ responses = [] }) => { + if ( responses.length === 0 ) return 0; + + return responses.reduce( ( acc, { value }) => acc + value, 0 ) / responses.length; + }), + subscribe( average => { + this.setState( ({ series = [] }) => { + return { + average, + series: [ + ...series.slice( -60 ), + average + ] + }; + }); + + console.log( 'Pipeline:', average ); + }) + ); + this.subscription = App.cable.subscriptions.create({ channel: "IssueChannel" }, { connected: function() { console.log('connected to IssueChannel') @@ -25,6 +54,7 @@ export class SocketProvider extends Component { }, received: ( data ) => { + this.series( 1, data ); this.setState( data ); } }); @@ -37,3 +67,46 @@ export class SocketProvider extends Component { this.subscription.send({ uuid, ...data }); } } + +const sample = ( period = 1000 ) => source => ( start, sink ) => { + let talkback; + let latest; + let interval; + + function receive( data ) { + latest = data; + + if ( !interval ) { + interval = setInterval( send, period ); + + send(); + } + } + + function send() { + sink( 1, latest ); + } + + function terminate( err ) { + clearInterval( interval ); + interval = null; + + // Send any pending data before terminating. + send(); + sink( 2, err ); + } + + if ( start === 0 ) { + source( 0, ( t, d ) => { + if ( t === 0 ) talkback = d; + if ( t === 1 ) receive( d ); + if ( t === 0 || t === 1 ) talkback( 1 ); + if ( t === 2 ) terminate( d ); + }); + + sink( 0, ( t ) => { + // Not pullable, so ignore t === 1. + if ( t === 2 && talkback ) talkback( 2 ); + }); + } +} diff --git a/app/javascript/components/Spark/Spark.css b/app/javascript/components/Spark/Spark.css new file mode 100644 index 0000000..f465879 --- /dev/null +++ b/app/javascript/components/Spark/Spark.css @@ -0,0 +1,4 @@ +.container { + background-color: #F6F6F6; + margin: 10px; +} diff --git a/app/javascript/components/Spark/Spark.js b/app/javascript/components/Spark/Spark.js new file mode 100644 index 0000000..cee767c --- /dev/null +++ b/app/javascript/components/Spark/Spark.js @@ -0,0 +1,12 @@ +// @flow +import React from 'react'; + +import styles from './Spark.css'; + +const HEIGHT = 60; + +export const Spark = ({ series }) => ( + +); diff --git a/app/javascript/components/Spark/index.js b/app/javascript/components/Spark/index.js new file mode 100644 index 0000000..94b2e30 --- /dev/null +++ b/app/javascript/components/Spark/index.js @@ -0,0 +1 @@ +export * from './Spark'; diff --git a/app/javascript/packs/hello_react.jsx b/app/javascript/packs/hello_react.jsx index bc5ee03..a63b24e 100644 --- a/app/javascript/packs/hello_react.jsx +++ b/app/javascript/packs/hello_react.jsx @@ -11,17 +11,19 @@ import { SocketProvider } from '../components/SocketProvider'; import { Controls } from '../components/Controls'; import { Graph } from '../components/Graph'; import { Bar } from '../components/Bar'; +import { Spark } from '../components/Spark'; const Hello = () => (