Skip to content

Commit

Permalink
Fixing redis
Browse files Browse the repository at this point in the history
  • Loading branch information
woody-apple committed Feb 25, 2024
1 parent 1a3da52 commit 9ac557e
Show file tree
Hide file tree
Showing 4 changed files with 825 additions and 506 deletions.
75 changes: 31 additions & 44 deletions lib/variables.js
Expand Up @@ -23,7 +23,7 @@ const prefixesToIgnore = _.isNil(prefixesToIgnoreString) ? null : prefixesToIgno
const topicStringsToIgnore = _.isNil(topicStringsToIgnoreString) ? null : topicStringsToIgnoreString.split(',')
const stuffixesToIgnore = _.isNil(stuffixesToIgnoreString) ? null : stuffixesToIgnoreString.split(',')

const isInterestingDevice = function(deviceTopic) {
const isInterestingDevice = function (deviceTopic) {
if (_.isNil(deviceTopic)) {
return false
}
Expand Down Expand Up @@ -81,7 +81,7 @@ const isInterestingDevice = function(deviceTopic) {
}


const shouldUseRedis = function() {
const shouldUseRedis = function () {
return !utilities.testMode() && !_.isNil(process.env.REDIS_HOST)
}

Expand All @@ -90,7 +90,7 @@ var redis = null
// Config
const expireAfterMinutes = process.env.EXPIRE_KEYS_AFTER_MINUTES

const connectToRedis = function(callback) {
async function connectToRedis(callback) {
if (!shouldUseRedis()) {
if (!_.isNil(callback)) {
return callback()
Expand All @@ -107,36 +107,21 @@ const connectToRedis = function(callback) {
return
}

logging.debug(' setting up redis client')
redis = Redis.createClient({
host: redisHost,
port: redisPort,
db: redisDB,
retry_strategy: function(options) {
if (options.error && options.error.code === 'ECONNREFUSED') {
// End reconnecting on a specific error and flush all commands with a individual error
return new Error('The server refused the connection')
}
if (options.total_retry_time > 1000 * 60 * 60) {
// End reconnecting after a specific timeout and flush all commands with a individual error
return new Error('Retry time exhausted')
}
if (options.times_connected > 10) {
// End reconnecting with built in error
return undefined
}
// reconnect after
return Math.min(options.attempt * 100, 3000)
logging.info(' setting up redis client')
redis = await Redis.createClient({
url: 'redis://' + redisHost + ':' + redisPort + '/' + redisDB,
socket: {
reconnectStrategy: retries => Math.min(retries * 50, 1000)
}
})

})
// redis callbacks

redis.on('error', function(err) {
redis.on('error', function (err) {
logging.error('redis error ' + err)
})

redis.on('connect', function() {
redis.on('connect', function () {
logging.info('redis connected ', {
action: 'redis-connected'
})
Expand All @@ -147,17 +132,19 @@ const connectToRedis = function(callback) {

})

redis.on('reconnect', function() {
redis.on('reconnect', function () {
logging.error('redis disconnected ', {
action: 'redis-disconnected'
})
})

await redis.connect()
}

const secondsToDefer = 10
var delayedUpdate = null

const processPendingUpdates = function() {
const processPendingUpdates = function () {
if (!shouldUseRedis()) {
return
}
Expand All @@ -184,7 +171,7 @@ const processPendingUpdates = function() {

redis.set(topic, message)
} else {
logging.debug('setting: ' + topic + ' to: ' + message)
logging.info('setting: ' + topic + ' to: ' + message)
redis.set(topic, message, 'EX', (timeToExpire * 60)) // redis takes seconds
}
})
Expand All @@ -197,14 +184,14 @@ const processPendingUpdates = function() {
Object.keys(state).forEach(topic => {
if (!observedTopics.includes(topic)) {
logging.debug('No longer need to observe, pruning:' + topic)
delete(state[topic])
delete (state[topic])
}
})

logging.debug(' => done')
}

const setPendingUpdateForTopic = function(topic) {
const setPendingUpdateForTopic = function (topic) {
if (!isInterestingDevice(topic)) {
return
}
Expand All @@ -222,12 +209,12 @@ const setPendingUpdateForTopic = function(topic) {

var initialStateLoaded = false

const _loadInitialState = function(topics, callback) {
const _loadInitialState = function (topics, callback) {
if (initialStateLoaded) {
return
}

const completion = function() {
const completion = function () {
logging.debug(' initial state loaded, sending callback: ' + JSON.stringify(state))

initialStateLoaded = true
Expand All @@ -238,10 +225,10 @@ const _loadInitialState = function(topics, callback) {

if (!utilities.testMode()) {
logging.debug(' starting connection')
connectToRedis(function() {
connectToRedis(function () {
logging.debug(' connection completed')

_updateObservedTopics(topics, function() {
_updateObservedTopics(topics, function () {
logging.debug(' done updating observed topics')
completion()
})
Expand All @@ -251,11 +238,11 @@ const _loadInitialState = function(topics, callback) {
}
}

module.exports.isInitialStateLoaded = function() {
module.exports.isInitialStateLoaded = function () {
return initialStateLoaded
}

const _updateObservedTopics = function(topics, callback) {
const _updateObservedTopics = function (topics, callback) {
observedTopics = topics

if (!shouldUseRedis()) {
Expand All @@ -282,7 +269,7 @@ const _updateObservedTopics = function(topics, callback) {
start_time: redisStartTime
})

redis.mget(topics, function(err, values) {
redis.mget(topics, function (err, values) {
const redisQueryTime = ((new Date().getTime()) - redisStartTime)
logging.debug(' redis query done', {
action: 'redis-query-done',
Expand All @@ -302,7 +289,7 @@ const _updateObservedTopics = function(topics, callback) {
try {
var jsonFound = JSON.parse(value)
if (!_.isNil(jsonFound)) {
Object.keys(jsonFound).forEach(function(key) {
Object.keys(jsonFound).forEach(function (key) {
if (_.isNil(state[key])) {
state[key] = jsonFound[key]
}
Expand All @@ -326,21 +313,21 @@ const _updateObservedTopics = function(topics, callback) {
})
}

module.exports.updateObservedTopics = function(topics, callback) {
module.exports.updateObservedTopics = function (topics, callback) {
if (initialStateLoaded) {
_updateObservedTopics(topics, callback)
} else {
_loadInitialState(topics, callback)
}
}

module.exports.clearState = function() {
module.exports.clearState = function () {
state = {}
pendingTopicUpdates = []
observedTopics = []
}

module.exports.update = function(topic, message) {
module.exports.update = function (topic, message) {
if (_.isNil(topic) || _.isNil(message)) {
return
}
Expand All @@ -353,7 +340,7 @@ module.exports.update = function(topic, message) {
}
}

module.exports.valueForTopic = function(topic) {
module.exports.valueForTopic = function (topic) {
if (_.isNil(topic)) {
return null
}
Expand All @@ -363,7 +350,7 @@ module.exports.valueForTopic = function(topic) {
return state[topic]
}

module.exports.valuesForTopics = function(topics) {
module.exports.valuesForTopics = function (topics) {
if (_.isNil(topics)) {
return null
}
Expand Down

0 comments on commit 9ac557e

Please sign in to comment.