Skip to content

Commit

Permalink
[BugFix] fix timestamp underflow for parquet format (#18521)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirtysalt committed Mar 3, 2023
1 parent be0cb08 commit e256919
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 60 deletions.
37 changes: 11 additions & 26 deletions be/src/formats/parquet/column_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,11 @@ class Int64ToDateTimeConverter : public ColumnConverter {
~Int64ToDateTimeConverter() override = default;

Status init(const std::string& timezone, const tparquet::SchemaElement& schema_element);
Status convert(const ColumnPtr& src, Column* dst) override { return _convert_to_timestamp_column(src, dst); }

private:
// convert column from int64 to timestamp
Status _convert_to_timestamp_column(const ColumnPtr& src, Column* dst);
// When Hive stores a timestamp value into Parquet format, it converts local time
// into UTC time, and when it reads data out, it should be converted to the time
// according to session variable "time_zone".
[[nodiscard]] Timestamp _utc_to_local(Timestamp timestamp) const {
return timestamp::add<TimeUnit::SECOND>(timestamp, _offset);
}
Status convert(const ColumnPtr& src, Column* dst) override;

private:
bool _is_adjusted_to_utc = false;
int _offset = 0;

cctz::time_zone _ctz;
int64_t _second_mask = 0;
int64_t _scale_to_nano_factor = 0;
};
Expand Down Expand Up @@ -594,7 +583,7 @@ Status Int96ToDateTimeConverter::convert(const ColumnPtr& src, Column* dst) {
for (size_t i = 0; i < size; i++) {
dst_null_data[i] = src_null_data[i];
if (!src_null_data[i]) {
Timestamp timestamp = (static_cast<uint64_t>(src_data[i].hi) << 40u) | (src_data[i].lo / 1000);
Timestamp timestamp = (static_cast<uint64_t>(src_data[i].hi) << TIMESTAMP_BITS) | (src_data[i].lo / 1000);
dst_data[i].set_timestamp(_utc_to_local(timestamp));
}
}
Expand All @@ -604,7 +593,6 @@ Status Int96ToDateTimeConverter::convert(const ColumnPtr& src, Column* dst) {

Status Int64ToDateTimeConverter::init(const std::string& timezone, const tparquet::SchemaElement& schema_element) {
DCHECK_EQ(schema_element.type, tparquet::Type::INT64);

if (schema_element.__isset.logicalType) {
if (!schema_element.logicalType.__isset.TIMESTAMP) {
std::stringstream ss;
Expand All @@ -616,6 +604,7 @@ Status Int64ToDateTimeConverter::init(const std::string& timezone, const tparque
_is_adjusted_to_utc = schema_element.logicalType.TIMESTAMP.isAdjustedToUTC;

const auto& time_unit = schema_element.logicalType.TIMESTAMP.unit;

if (time_unit.__isset.MILLIS) {
_second_mask = 1000;
_scale_to_nano_factor = 1000000;
Expand Down Expand Up @@ -650,20 +639,15 @@ Status Int64ToDateTimeConverter::init(const std::string& timezone, const tparque
}

if (_is_adjusted_to_utc) {
cctz::time_zone ctz;
if (!TimezoneUtils::find_cctz_time_zone(timezone, ctz)) {
if (!TimezoneUtils::find_cctz_time_zone(timezone, _ctz)) {
return Status::InternalError(strings::Substitute("can not find cctz time zone $0", timezone));
}

const auto tp = std::chrono::system_clock::now();
const cctz::time_zone::absolute_lookup al = ctz.lookup(tp);
_offset = al.offset;
}

return Status::OK();
}

Status Int64ToDateTimeConverter::_convert_to_timestamp_column(const ColumnPtr& src, Column* dst) {
Status Int64ToDateTimeConverter::convert(const ColumnPtr& src, Column* dst) {
auto* src_nullable_column = ColumnHelper::as_raw_column<NullableColumn>(src);
// hive only support null column
// TODO: support not null
Expand All @@ -682,10 +666,11 @@ Status Int64ToDateTimeConverter::_convert_to_timestamp_column(const ColumnPtr& s
for (size_t i = 0; i < size; i++) {
dst_null_data[i] = src_null_data[i];
if (!src_null_data[i]) {
Timestamp timestamp =
timestamp::of_epoch_second(static_cast<int>(src_data[i] / _second_mask),
static_cast<int>((src_data[i] % _second_mask) * _scale_to_nano_factor));
dst_data[i].set_timestamp(_utc_to_local(timestamp));
int64_t seconds = src_data[i] / _second_mask;
int64_t nanoseconds = (src_data[i] % _second_mask) * _scale_to_nano_factor;
TimestampValue ep;
ep.from_unixtime(seconds, nanoseconds / 1000, _ctz);
dst_data[i].set_timestamp(ep.timestamp());
}
}
dst_nullable_column->set_has_null(src_nullable_column->has_null());
Expand Down
13 changes: 6 additions & 7 deletions be/src/runtime/time_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include <string>

#include "common/compiler_util.h"

namespace starrocks {

// Date: Julian Date -2000-01-01 ~ 9999-01-01
Expand Down Expand Up @@ -72,6 +71,7 @@ static const int64_t USECS_PER_MINUTE = 60000000;
static const int64_t USECS_PER_SEC = 1000000;

static const int64_t NANOSECS_PER_USEC = 1000;
static const int64_t NANOSECS_PER_SEC = 1000000000;

// Corresponding to TimeUnit
static constexpr int64_t USECS_PER_UNIT[] = {
Expand Down Expand Up @@ -208,7 +208,7 @@ class timestamp {

inline static double time_to_literal(double time);

inline static Timestamp of_epoch_second(int seconds, int microseconds);
inline static Timestamp of_epoch_second(int64_t seconds, int64_t microseconds);

public:
// MAX_DATE | USECS_PER_DAY
Expand Down Expand Up @@ -376,11 +376,10 @@ double timestamp::time_to_literal(double time) {
return hour * 10000 + minute * 100 + second;
}

Timestamp timestamp::of_epoch_second(int seconds, int nanoseconds) {
int days = seconds / SECS_PER_DAY;
JulianDate jd = days + date::UNIX_EPOCH_JULIAN;
return timestamp::from_julian_and_time(jd,
seconds % SECS_PER_DAY * USECS_PER_SEC + nanoseconds / NANOSECS_PER_USEC);
Timestamp timestamp::of_epoch_second(int64_t seconds, int64_t nanoseconds) {
int64_t second = seconds + timestamp::UNIX_EPOCH_SECONDS;
JulianDate day = second / SECS_PER_DAY;
return timestamp::from_julian_and_time(day, second * USECS_PER_SEC + nanoseconds / NANOSECS_PER_USEC);
}

struct JulianToDateEntry {
Expand Down
12 changes: 9 additions & 3 deletions be/src/types/timestamp_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -816,17 +816,23 @@ bool TimestampValue::from_unixtime(int64_t second, const std::string& timezone)
if (!TimezoneUtils::find_cctz_time_zone(timezone, ctz)) {
return false;
}
return from_unixtime(second, ctz);
from_unixtime(second, ctz);
return true;
}

bool TimestampValue::from_unixtime(int64_t second, const cctz::time_zone& ctz) {
void TimestampValue::from_unixtime(int64_t second, const cctz::time_zone& ctz) {
static const cctz::time_point<cctz::sys_seconds> epoch =
std::chrono::time_point_cast<cctz::sys_seconds>(std::chrono::system_clock::from_time_t(0));
cctz::time_point<cctz::sys_seconds> t = epoch + cctz::seconds(second);

const auto tp = cctz::convert(t, ctz);
from_timestamp(tp.year(), tp.month(), tp.day(), tp.hour(), tp.minute(), tp.second(), 0);
return true;
}

void TimestampValue::from_unixtime(int64_t second, int64_t microsecond, const cctz::time_zone& ctz) {
from_unixtime(second, ctz);
_timestamp += microsecond;
return;
}

void TimestampValue::from_unix_second(int64_t second) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/types/timestamp_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class TimestampValue {
int64_t to_unix_second() const;

bool from_unixtime(int64_t second, const std::string& timezone);
bool from_unixtime(int64_t second, const cctz::time_zone& ctz);
void from_unixtime(int64_t second, const cctz::time_zone& ctz);
void from_unixtime(int64_t second, int64_t microsecond, const cctz::time_zone& ctz);

void from_unix_second(int64_t second);

Expand Down
125 changes: 102 additions & 23 deletions be/test/exec/hdfs_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class HdfsScannerTest : public ::testing::Test {
RuntimeProfile* _runtime_profile = nullptr;
RuntimeState* _runtime_state = nullptr;
std::string _debug_row_output;
int _debug_rows_per_call = 1;
};

void HdfsScannerTest::_create_runtime_profile() {
Expand Down Expand Up @@ -305,29 +306,32 @@ static void extend_partition_values(ObjectPool* pool, HdfsScannerParams* params,
params->partition_values = part_values;
}

#define READ_SCANNER_RETURN_ROWS(scanner, records) \
do { \
_debug_row_output = ""; \
ChunkPtr chunk = ChunkHelper::new_chunk(*tuple_desc, 0); \
for (;;) { \
chunk->reset(); \
status = scanner->get_next(_runtime_state, &chunk); \
if (status.is_end_of_file()) { \
break; \
} \
if (!status.ok()) { \
std::cout << "status not ok: " << status.get_error_msg() << std::endl; \
break; \
} \
chunk->check_or_die(); \
if (chunk->num_rows() > 0) { \
_debug_row_output += chunk->debug_row(0); \
_debug_row_output += '\n'; \
std::cout << "row#0: " << chunk->debug_row(0) << std::endl; \
EXPECT_EQ(chunk->num_columns(), tuple_desc->slots().size()); \
} \
records += chunk->num_rows(); \
} \
#define READ_SCANNER_RETURN_ROWS(scanner, records) \
do { \
_debug_row_output = ""; \
ChunkPtr chunk = ChunkHelper::new_chunk(*tuple_desc, 0); \
for (;;) { \
chunk->reset(); \
status = scanner->get_next(_runtime_state, &chunk); \
if (status.is_end_of_file()) { \
break; \
} \
if (!status.ok()) { \
std::cout << "status not ok: " << status.get_error_msg() << std::endl; \
break; \
} \
chunk->check_or_die(); \
if (chunk->num_rows() > 0) { \
int rep = std::min(_debug_rows_per_call, (int)chunk->num_rows()); \
for (int i = 0; i < rep; i++) { \
std::cout << "row#" << i << ": " << chunk->debug_row(i) << std::endl; \
_debug_row_output += chunk->debug_row(i); \
_debug_row_output += '\n'; \
} \
EXPECT_EQ(chunk->num_columns(), tuple_desc->slots().size()); \
} \
records += chunk->num_rows(); \
} \
} while (0)

#define READ_SCANNER_ROWS(scanner, exp) \
Expand Down Expand Up @@ -2046,4 +2050,79 @@ TEST_F(HdfsScannerTest, TestHudiMORArrayMapStruct2) {
}
}

// =============================================================================
/*
spark-sql> select * from test_hudi_mor13;
_hoodie_commit_time _hoodie_commit_seqno _hoodie_record_key _hoodie_partition_path _hoodie_file_name a b ts uuid
20230216103800029 20230216103800029_0_0 aa1 14260209-008c-4170-bed3-5533d8783f0f-0_0-167-153_20230216103800029.parquet 300 3023-01-01 00:00:00 40 aa1
20230216103800029 20230216103800029_0_1 aa0 14260209-008c-4170-bed3-5533d8783f0f-0_0-167-153_20230216103800029.parquet 200 2023-01-01 00:00:00 30 aa0
20230216103800029 20230216103800029_0_2 aa2 14260209-008c-4170-bed3-5533d8783f0f-0_0-167-153_20230216103800029.parquet 400 1000-01-01 00:00:00 50 aa2
20230216103800029 20230216103800029_0_3 aa 14260209-008c-4170-bed3-5533d8783f0f-0_0-167-153_20230216103800029.parquet 100 1900-01-01 00:00:00 20 aa
*/

/*
_hoodie_commit_time = 20230216103800029
_hoodie_commit_seqno = 20230216103800029_0_0
_hoodie_record_key = aa1
_hoodie_partition_path =
_hoodie_file_name = 14260209-008c-4170-bed3-5533d8783f0f-0_0-167-153_20230216103800029.parquet
a = 300
b = 33229411200000000
ts = 40
uuid = aa1
_hoodie_commit_time = 20230216103800029
_hoodie_commit_seqno = 20230216103800029_0_1
_hoodie_record_key = aa0
_hoodie_partition_path =
_hoodie_file_name = 14260209-008c-4170-bed3-5533d8783f0f-0_0-167-153_20230216103800029.parquet
a = 200
b = 1672502400000000
ts = 30
uuid = aa0
_hoodie_commit_time = 20230216103800029
_hoodie_commit_seqno = 20230216103800029_0_2
_hoodie_record_key = aa2
_hoodie_partition_path =
_hoodie_file_name = 14260209-008c-4170-bed3-5533d8783f0f-0_0-167-153_20230216103800029.parquet
a = 400
b = -30610253143000000
ts = 50
uuid = aa2
_hoodie_commit_time = 20230216103800029
_hoodie_commit_seqno = 20230216103800029_0_3
_hoodie_record_key = aa
_hoodie_partition_path =
_hoodie_file_name = 14260209-008c-4170-bed3-5533d8783f0f-0_0-167-153_20230216103800029.parquet
a = 100
b = -2209017943000000
ts = 20
uuid = aa
*/

TEST_F(HdfsScannerTest, TestParquetTimestampToDatetime) {
SlotDesc parquet_descs[] = {{"b", TypeDescriptor::from_logical_type(LogicalType::TYPE_DATETIME)}, {""}};

const std::string parquet_file = "./be/test/exec/test_data/parquet_scanner/timestamp_to_datetime.parquet";

_create_runtime_state("Asia/Shanghai");
auto scanner = std::make_shared<HdfsParquetScanner>();
auto* range = _create_scan_range(parquet_file, 0, 0);
auto* tuple_desc = _create_tuple_desc(parquet_descs);
auto* param = _create_param(parquet_file, range, tuple_desc);

_debug_rows_per_call = 10;
Status status = scanner->init(_runtime_state, *param);
EXPECT_TRUE(status.ok());
status = scanner->open(_runtime_state);
EXPECT_TRUE(status.ok());
READ_SCANNER_ROWS(scanner, 4);
EXPECT_EQ(scanner->raw_rows_read(), 4);
EXPECT_EQ(_debug_row_output,
"[3023-01-01 00:00:00]\n[2023-01-01 00:00:00]\n[1000-01-01 00:00:00]\n[1900-01-01 00:00:00]\n");
scanner->close(_runtime_state);
}

} // namespace starrocks
Binary file not shown.

0 comments on commit e256919

Please sign in to comment.