Skip to content

Commit

Permalink
[SPARK-40813][CONNECT][PYTHON][FOLLOW-UP] Improve limit and offset in…
Browse files Browse the repository at this point in the history
… Python client

### What changes were proposed in this pull request?

Following up after #38275, improve limit and offset in Python client.

### Why are the changes needed?

Improve API coverage.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT

Closes #38314 from amaliujia/python_test_limit_offset.

Authored-by: Rui Wang <rui.wang@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
amaliujia authored and HyukjinKwon committed Oct 21, 2022
1 parent a51dd18 commit 45bb957
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 3 deletions.
3 changes: 3 additions & 0 deletions python/pyspark/sql/connect/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ def join(self, other: "DataFrame", on: Any, how: Optional[str] = None) -> "DataF
def limit(self, n: int) -> "DataFrame":
return DataFrame.withPlan(plan.Limit(child=self._plan, limit=n), session=self._session)

def offset(self, n: int) -> "DataFrame":
return DataFrame.withPlan(plan.Offset(child=self._plan, offset=n), session=self._session)

def sort(self, *cols: "ColumnOrString") -> "DataFrame":
"""Sort by a specific column"""
return DataFrame.withPlan(plan.Sort(self._plan, *cols), session=self._session)
Expand Down
32 changes: 29 additions & 3 deletions python/pyspark/sql/connect/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,9 @@ def _repr_html_(self) -> str:


class Limit(LogicalPlan):
def __init__(self, child: Optional["LogicalPlan"], limit: int, offset: int = 0) -> None:
def __init__(self, child: Optional["LogicalPlan"], limit: int) -> None:
super().__init__(child)
self.limit = limit
self.offset = offset

def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation:
assert self._child is not None
Expand All @@ -286,14 +285,41 @@ def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation:

def print(self, indent: int = 0) -> str:
c_buf = self._child.print(indent + LogicalPlan.INDENT) if self._child else ""
return f"{' ' * indent}<Limit limit={self.limit} offset={self.offset}>\n{c_buf}"
return f"{' ' * indent}<Limit limit={self.limit}>\n{c_buf}"

def _repr_html_(self) -> str:
return f"""
<ul>
<li>
<b>Limit</b><br />
Limit: {self.limit} <br />
{self._child_repr_()}
</li>
</uL>
"""


class Offset(LogicalPlan):
def __init__(self, child: Optional["LogicalPlan"], offset: int = 0) -> None:
super().__init__(child)
self.offset = offset

def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation:
assert self._child is not None
plan = proto.Relation()
plan.offset.input.CopyFrom(self._child.plan(session))
plan.offset.offset = self.offset
return plan

def print(self, indent: int = 0) -> str:
c_buf = self._child.print(indent + LogicalPlan.INDENT) if self._child else ""
return f"{' ' * indent}<Offset={self.offset}>\n{c_buf}"

def _repr_html_(self) -> str:
return f"""
<ul>
<li>
<b>Limit</b><br />
Offset: {self.offset} <br />
{self._child_repr_()}
</li>
Expand Down
7 changes: 7 additions & 0 deletions python/pyspark/sql/tests/connect/test_connect_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ def test_simple_binary_expressions(self):
res = pandas.DataFrame(data={"id": [0, 30, 60, 90]})
self.assert_(pd.equals(res), f"{pd.to_string()} != {res.to_string()}")

def test_limit_offset(self):
df = self.connect.read.table(self.tbl_name)
pd = df.limit(10).offset(1).toPandas()
self.assertEqual(9, len(pd.index))
pd2 = df.offset(98).limit(10).toPandas()
self.assertEqual(2, len(pd2.index))

def test_simple_datasource_read(self) -> None:
writeDf = self.df_text
tmpPath = tempfile.mkdtemp()
Expand Down
10 changes: 10 additions & 0 deletions python/pyspark/sql/tests/connect/test_connect_plan_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ def test_filter(self):
self.assertEqual(plan.root.filter.condition.unresolved_function.parts, [">"])
self.assertEqual(len(plan.root.filter.condition.unresolved_function.arguments), 2)

def test_limit(self):
df = self.connect.readTable(table_name=self.tbl_name)
limit_plan = df.limit(10)._plan.to_proto(self.connect)
self.assertEqual(limit_plan.root.limit.limit, 10)

def test_offset(self):
df = self.connect.readTable(table_name=self.tbl_name)
offset_plan = df.offset(10)._plan.to_proto(self.connect)
self.assertEqual(offset_plan.root.offset.offset, 10)

def test_relation_alias(self):
df = self.connect.readTable(table_name=self.tbl_name)
plan = df.alias("table_alias")._plan.to_proto(self.connect)
Expand Down

0 comments on commit 45bb957

Please sign in to comment.