Skip to content

Commit

Permalink
Merge pull request #63 from treasure-data/PLT-11303
Browse files Browse the repository at this point in the history
[Mixpanel][Input] Update the authentication method to latest
  • Loading branch information
hieuduong89 committed Jan 4, 2019
2 parents f66288d + 6411615 commit b29245b
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 76 deletions.
9 changes: 2 additions & 7 deletions lib/embulk/input/mixpanel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ def self.transaction(config, &control)
dates: range,
timezone: timezone,
export_endpoint: export_endpoint(config),
api_key: config.param(:api_key, :string),
api_secret: config.param(:api_secret, :string),
schema: config.param(:columns, :array),
fetch_unknown_columns: fetch_unknown_columns,
Expand Down Expand Up @@ -129,8 +128,7 @@ def self.guess(config)
retry_initial_wait_sec: config.param(:retry_initial_wait_sec, :integer, default: 1),
retry_limit: config.param(:retry_limit, :integer, default: 5),
})
client = MixpanelApi::Client.new(config.param(:api_key, :string),
config.param(:api_secret, :string),
client = MixpanelApi::Client.new(config.param(:api_secret, :string),
retryer,
export_endpoint(config))

Expand Down Expand Up @@ -162,7 +160,6 @@ def self.export_endpoint(config)

def init
@export_endpoint = task[:export_endpoint]
@api_key = task[:api_key]
@api_secret = task[:api_secret]
@params = task[:params]
@timezone = task[:timezone]
Expand Down Expand Up @@ -310,7 +307,7 @@ def fetch(dates, last_fetch_time, &block)
)
end
Embulk.logger.info "Where params is #{params["where"]}"
client = MixpanelApi::Client.new(@api_key, @api_secret, self.class.perfect_retry(task), @export_endpoint)
client = MixpanelApi::Client.new(@api_secret, self.class.perfect_retry(task), @export_endpoint)

if preview?
client.export_for_small_dataset(params)
Expand Down Expand Up @@ -342,9 +339,7 @@ def preview?
def self.export_params(config)
event = config.param(:event, :array, default: nil)
event = event.nil? ? nil : event.to_json

{
api_key: config.param(:api_key, :string),
event: event,
where: config.param(:where, :string, default: nil),
bucket: config.param(:bucket, :string, default: nil),
Expand Down
28 changes: 5 additions & 23 deletions lib/embulk/input/mixpanel_api/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ def self.mixpanel_available?(endpoint = nil)
begin
retryer.with_retry do
client = HTTPClient.new
client.force_basic_auth = true
client.connect_timeout = PING_TIMEOUT_SECONDS
client.set_auth(nil, @api_secret, nil)
client.get(URI.join(endpoint, '/'))
end
true
Expand All @@ -38,9 +40,8 @@ def self.mixpanel_available?(endpoint = nil)
end
end

def initialize(api_key, api_secret, retryer = nil, endpoint = DEFAULT_EXPORT_ENDPOINT)
def initialize(api_secret, retryer = nil, endpoint = DEFAULT_EXPORT_ENDPOINT)
@endpoint = endpoint
@api_key = api_key
@api_secret = api_secret
@retryer = retryer || PerfectRetry.new do |config|
# for test
Expand Down Expand Up @@ -100,7 +101,6 @@ def response_to_enum(response_body)
def request(params, &block)
# https://mixpanel.com/docs/api-documentation/exporting-raw-data-you-inserted-into-mixpanel
Embulk.logger.debug "Export param: #{params.to_s}"
set_signatures(params)

buf = ""
error_response = ''
Expand Down Expand Up @@ -133,7 +133,6 @@ def request_small_dataset(params, num_of_records)
# guess/preview
# Try to fetch first number of records
params["limit"] = num_of_records
set_signatures(params)
Embulk.logger.info "Sending request to #{@endpoint}"
res = httpclient.get(@endpoint, params)
handle_error(res,res.body)
Expand All @@ -154,25 +153,6 @@ def handle_error(response, error_response)
end
end

def set_signatures(params)
params[:expire] ||= Time.now.to_i + TIMEOUT_SECONDS
params[:sig] = signature(params)
params
end

def signature(params)
# https://mixpanel.com/docs/api-documentation/data-export-api#auth-implementation
params.delete(:sig)
sorted_keys = params.keys.map(&:to_s).sort
signature = sorted_keys.inject("") do |sig, key|
value = params[key] || params[key.to_sym]
next sig unless value
sig << "#{key}=#{value}"
end

Digest::MD5.hexdigest(signature + @api_secret)
end

def httpclient
@client ||=
begin
Expand All @@ -181,6 +161,8 @@ def httpclient
client.tcp_keepalive = true
client.default_header = {Accept: "application/json; charset=UTF-8"}
# client.debug_dev = STDERR
client.force_basic_auth = true
client.set_auth(nil, @api_secret, nil);
client
end
end
Expand Down
58 changes: 27 additions & 31 deletions test/embulk/input/mixpanel_api/test_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,28 @@ module MixpanelApi
class ClientTest < Test::Unit::TestCase
include OverrideAssertRaise

API_KEY = "api_key".freeze
API_SECRET = "api_secret".freeze

def setup
@client = Client.new(API_KEY, API_SECRET)
@client = Client.new(API_SECRET)
stub(Embulk).logger { ::Logger.new(IO::NULL) }
end

# NOTE: Client#signature is private method but this value
# can't be checked via other methods.
def test_signature
now = Time.parse("2015-07-22 00:00:00")
stub(Time).now { now }

params = {
string: "string",
array: ["elem1", "elem2"],
}
expected = "4be4a4f92f57e12b543a2a5f2f5897b6"

assert_equal(expected, @client.__send__(:signature, params))
end

class TestKeepAlive < self
def test_tcp_keepalive_enabled
client = Client.new(API_KEY, API_SECRET)
client = Client.new(API_SECRET)
assert client.send(:httpclient).tcp_keepalive
end
end

class TryToDatesTest < self
def setup
@client = Client.new(API_KEY, API_SECRET)
@client = Client.new(API_SECRET)
end

data do
[
["2000-01-01", "2000-01-01"],
["2010-01-01", "2010-01-01"],
["3 days ago", (Date.today - 3).to_s],
]
end
def test_candidates(from_str)

def test_candidates_case1
from_str = "2000-01-01"
from = Date.parse(from_str)
yesterday = Date.today - 1
dates = @client.try_to_dates(from.to_s)
Expand All @@ -66,6 +45,24 @@ def test_candidates(from_str)
assert_equal expect, dates
assert dates.all?{|d| d <= yesterday}
end

def test_candidates_case2
from_str = (Date.today - 3).to_s
from = Date.parse(from_str)
yesterday = Date.today - 1
dates = @client.try_to_dates(from.to_s)
expect = [
from + 1,
from + 10,
from + 100,
from + 1000,
from + 10000,
yesterday,
].find_all{|d| d <= yesterday}

assert_equal expect, dates
assert dates.all?{|d| d <= yesterday}
end
end

class ExportTest < self
Expand All @@ -77,7 +74,7 @@ def setup

def test_success
stub_client
stub(@client).set_signatures(anything) {}
# stub(@client).set_signatures(anything) {}
stub_response(success_response)

records = []
Expand All @@ -90,7 +87,7 @@ def test_success

def test_export_partial_with_export_terminated_early
stub_client
stub(@client).set_signatures(anything) {}
# stub(@client).set_signatures(anything) {}
stub_response(Struct.new(:code, :body).new(200, jsonl_dummy_responses+"\nexport terminated early"))

records = []
Expand All @@ -104,7 +101,7 @@ def test_export_partial_with_export_terminated_early

def test_export_partial_with_error_json
stub_client
stub(@client).set_signatures(anything) {}
# stub(@client).set_signatures(anything) {}
stub_response(Struct.new(:code, :body).new(200, jsonl_dummy_responses+"\n{\"error\":"))
records = []
assert_raise MixpanelApi::IncompleteExportResponseError do
Expand Down Expand Up @@ -203,7 +200,6 @@ def failure_response(code)

def params
{
"api_key" => API_KEY,
"api_secret" => API_SECRET,
"from_date" => "2015-01-01",
"to_date" => "2015-03-02",
Expand Down
15 changes: 0 additions & 15 deletions test/embulk/input/test_mixpanel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ module Input
class MixpanelTest < Test::Unit::TestCase
include OverrideAssertRaise

API_KEY = "api_key".freeze
API_SECRET = "api_secret".freeze
FROM_DATE = "2015-02-22".freeze
TO_DATE = "2015-03-02".freeze
Expand All @@ -31,7 +30,6 @@ def setup

def setup_client
params = {
api_key: API_KEY,
event: nil,
where: nil,
bucket: nil,
Expand Down Expand Up @@ -72,7 +70,6 @@ def setup
def test_from_date_old_date
config = {
type: "mixpanel",
api_key: API_KEY,
api_secret: API_SECRET,
from_date: FROM_DATE,
}
Expand All @@ -87,7 +84,6 @@ def test_from_date_old_date
def test_from_date_future
config = {
type: "mixpanel",
api_key: API_KEY,
api_secret: API_SECRET,
timezone: TIMEZONE,
from_date: (today + 1).to_s
Expand All @@ -103,7 +99,6 @@ def test_from_date_yesterday
from_date = (today - 1).to_s
config = {
type: "mixpanel",
api_key: API_KEY,
api_secret: API_SECRET,
from_date: from_date,
}
Expand All @@ -117,7 +112,6 @@ def test_from_date_yesterday
def test_no_from_date
config = {
type: "mixpanel",
api_key: API_KEY,
api_secret: API_SECRET,
timezone: TIMEZONE
}
Expand All @@ -141,7 +135,6 @@ def test_mixpanel_is_down
stub(Embulk::Input::MixpanelApi::Client).mixpanel_available? { false }
config = {
type: "mixpanel",
api_key: API_KEY,
api_secret: API_SECRET,
}

Expand Down Expand Up @@ -300,7 +293,6 @@ def test_invalid_timezone
def transaction_task(timezone)
task.merge(
dates: DATES.map {|date| date.to_s},
api_key: API_KEY,
api_secret: API_SECRET,
incremental: true,
incremental_column: nil,
Expand Down Expand Up @@ -366,7 +358,6 @@ def transaction_task(days)
from_date = Date.parse(FROM_DATE)
task.merge(
dates: (from_date..(from_date + days - 1)).map {|date| date.to_s},
api_key: API_KEY,
api_secret: API_SECRET,
timezone: TIMEZONE,
schema: schema
Expand Down Expand Up @@ -434,7 +425,6 @@ def control
def transaction_task
task.merge(
dates: DATES.map {|date| date.to_s},
api_key: API_KEY,
api_secret: API_SECRET,
timezone: TIMEZONE,
schema: schema
Expand All @@ -451,7 +441,6 @@ def columns
def test_export_params
config_params = [
:type, "mixpanel",
:api_key, API_KEY,
:api_secret, API_SECRET,
:from_date, FROM_DATE,
:to_date, TO_DATE,
Expand All @@ -463,7 +452,6 @@ def test_export_params
config = DataSource[*config_params]

expected = {
api_key: API_KEY,
event: "[\"ViewHoge\",\"ViewFuga\"]",
where: 'properties["$os"] == "Windows"',
bucket: "987",
Expand Down Expand Up @@ -541,7 +529,6 @@ def stub_response(code)

def task
{
api_key: API_KEY,
api_secret: API_SECRET,
export_endpoint: "https://data.mixpanel.com/api/2.0/export/",
timezone: TIMEZONE,
Expand Down Expand Up @@ -846,7 +833,6 @@ def schema

def task
{
api_key: API_KEY,
api_secret: API_SECRET,
export_endpoint: "https://data.mixpanel.com/api/2.0/export/",
timezone: TIMEZONE,
Expand Down Expand Up @@ -891,7 +877,6 @@ def record_epoch
def config
{
type: "mixpanel",
api_key: API_KEY,
api_secret: API_SECRET,
from_date: FROM_DATE,
fetch_days: DAYS,
Expand Down

0 comments on commit b29245b

Please sign in to comment.