Skip to content
/ rxtx Public

Queue based data collector, transmitter with store-and-forward. Useful for online/offline data collection, back pressure buffering or general queuing. REST / HTTP post.

License

Notifications You must be signed in to change notification settings

txn2/rxtx

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

83 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

rxtx data transmission irsync Release Build Status Go Report Card Maintainability GoDoc FOSSA Status

Docker Container Image Size Docker Container Layers Docker Container Pulls

rxtx

rxtx is a queue based data collector > data transmitter. Useful for online/offline data collection, back pressure buffering or general queuing. rxtx uses bbolt maintained by CoreOs, a single file database for storing messages before they can be sent.

rtbeat was developed to consume rxtx POST data and publish as events into elasticsearch, logstash, kafka, redis or directly to log files.

Test on MacOs

Install with brew

brew tap txn2/homebrew-tap
brew install rxtx

Help

rxtx -h

Test Docker Container

Help

docker run --rm -it txn2/rxtx -h

on arm 6/7 based device:

docker run --rm -it txn2/rxtx:arm32v6-1.2.0 -h

Test Source

Help

go run ./rxtx.go -h

Usage of rxtx:
  -batch int
        Batch size. (default 5000)
  -ingest string
        Ingest server. (default "http://localhost:8081/in")
  -interval int
        Seconds between intervals. (default 30)
  -maxq int
        Max number of message in queue. (default 2000000)
  -name string
        Service name. (default "rxtx")
  -path string
        Directory to store database. (default "./")
  -port string
        Server port. (default "8080")

Start server on 8080

go run ./rxtx.go 

Add message to queue

The rxtx services accepts HTTP POST data to an API endpoint in the following form /rx/PRODUCER/KEY/LABEL/.../. One label is required, however as many labels as necessary may be added, separated by a forward slash.

curl -w "\n" -d "{\"generic\": \"$RANDOM\"}" -X POST http://localhost:8080/rx/me/generic_data/generic/test/data

Add message to queue every second

 while true; do curl -w "\n" -d "{\"generic\": \"$RANDOM\"}" -X POST http://localhost:8080/rx/me/generic_data/generic/test/data; sleep 1; done

Add 1000 messages to the queue.

 time for i in {1..1000}; do curl -w "\n" -d "{\"generic\": \"$RANDOM\"}" -X POST http://localhost:8080/rx/me/generic_data/generic/test/data; done

Example Batch

{  
   "uuid":"d7642975-4241-4f43-b704-b67621b184b8",
   "size":4,
   "messages":[  
      {  
         "seq":"2019072100000000038",
         "time":"2019-07-21T02:44:49.560087Z",
         "uuid":"a93cfe42-79c5-4a32-9dc0-6b77b68d0926",
         "producer":"me",
         "label":"/generic/test/data",
         "key":"generic_data",
         "payload":{  
            "generic":"6955"
         }
      },
      {  
         "seq":"2019072100000000039",
         "time":"2019-07-21T02:44:50.6214579Z",
         "uuid":"66042da0-b0c1-447c-ab0b-66c887e8b56a",
         "producer":"me",
         "label":"/generic/test/data",
         "key":"generic_data",
         "payload":{  
            "generic":"8923"
         }
      },
      {  
         "seq":"2019072100000000040",
         "time":"2019-07-21T02:44:51.622978Z",
         "uuid":"0ec3f6d7-905f-42bc-896e-515bf8f2a06d",
         "producer":"me",
         "label":"/generic/test/data",
         "key":"generic_data",
         "payload":{  
            "generic":"28324"
         }
      },
      {  
         "seq":"2019072100000000046",
         "time":"2019-07-21T02:44:57.8990519Z",
         "uuid":"bfa2d06d-a8f4-4125-93bc-d08a070079d7",
         "producer":"me",
         "label":"/generic/test/data",
         "key":"generic_data",
         "payload":{  
            "generic":"31460"
         }
      }
   ]
}

Profile

go build ./rxtx.go && time ./rxtx --path=./data/ --cpuprofile=rxtxcpu.prof --memprofile=rxtxmem.prof

Browser-based profile viewer:

go tool pprof -http=:8081 rxtxcpu.prof

Building and Releasing

rxtx uses GORELEASER to build binaries and Docker containers.

Test Release Steps

Install GORELEASER with brew (MacOS):

brew install goreleaser/tap/goreleaser

Build without releasing:

goreleaser --skip-publish --rm-dist --skip-validate

Release Steps

  • Commit latest changes
  • Tag a version git tag -a v2.0 -m "Version 2.0"
  • Push tag git push origin v2.0
  • Run: GITHUB_TOKEN=$GITHUB_TOKEN goreleaser --rm-dist

Resources

License

FOSSA Status