Skip to content

Commit dce379f

Browse files
authored
feat(ws): support blob type (#338)
* style(ws): format code * feat(ws): support blob type * test: add blob test case * test: add TEST_TD_3360 env * chore: switch to taos main branch * style: format code
1 parent dd301fd commit dce379f

10 files changed

Lines changed: 186 additions & 44 deletions

File tree

.github/workflows/taos-ws-py-compatibility.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ jobs:
103103
TDENGINE_URL: localhost:6041
104104
WS_CLOUD_URL: ${{ secrets.WS_CLOUD_URL }}
105105
WS_CLOUD_TOKEN: ${{ secrets.WS_CLOUD_TOKEN }}
106+
TEST_TD_3360: "true"
106107
run: |
107108
pip3 install pytest
108109
pytest ./taos-ws-py/tests/

taos-ws-py/Cargo.lock

Lines changed: 22 additions & 32 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

taos-ws-py/src/common.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ pub fn to_py_datetime(dt: chrono::NaiveDateTime, py: Python) -> PyResult<PyObjec
2525
Ok(datetime.call(args, None)?.into_py(py))
2626
}
2727

28-
// pub fn datetime_to_py(t: chrono py: Python)
2928
pub unsafe fn get_row_of_block_unchecked(py: Python, block: &RawBlock, index: usize) -> PyObject {
3029
let mut vec = Vec::new();
3130
for i in 0..block.ncols() {
@@ -58,7 +57,7 @@ pub unsafe fn get_row_of_block_unchecked(py: Python, block: &RawBlock, index: us
5857
BorrowedValue::Geometry(v) => v.into_py(py),
5958
BorrowedValue::Decimal64(v) => v.to_string().into_py(py),
6059
BorrowedValue::Decimal(v) => v.to_string().into_py(py),
61-
BorrowedValue::Blob(_) => todo!(),
60+
BorrowedValue::Blob(v) => v.into_py(py),
6261
BorrowedValue::MediumBlob(_) => todo!(),
6362
};
6463
vec.push(val);

taos-ws-py/src/consumer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ impl Consumer {
175175
Ok(topics)
176176
}
177177

178-
///
178+
/// Poll for a message with an optional timeout.
179179
pub fn poll(&mut self, timeout: Option<f64>) -> PyResult<Option<Message>> {
180180
let timeout = if let Some(timeout) = timeout {
181181
Timeout::Duration(Duration::from_secs_f64(timeout))

taos-ws-py/src/cursor.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,6 @@ impl Cursor {
205205
let vec: Vec<_> = seq_of_parameters
206206
.iter()?
207207
.map(|row| -> PyResult<String> {
208-
// let params = row.extract().unwrap();
209208
let row = row?;
210209
if row.is_instance_of::<PyDict>()? {
211210
let local = PyDict::new(py);

taos-ws-py/src/lib.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ impl TaosResult {
266266
BorrowedValue::Json(j) => std::str::from_utf8(&j).unwrap().into_py(py),
267267
BorrowedValue::VarBinary(v) => v.as_ref().into_py(py),
268268
BorrowedValue::Geometry(v) => v.as_ref().into_py(py),
269+
BorrowedValue::Blob(v) => v.as_ref().into_py(py),
269270
_ => Option::<()>::None.into_py(py),
270271
};
271272
vec.push(value);
@@ -991,6 +992,18 @@ fn geometry_to_column(values: Vec<Option<Vec<u8>>>) -> PyColumnView {
991992
}
992993
}
993994

995+
#[pyfunction]
996+
fn blob_to_column(values: Vec<Option<Vec<u8>>>) -> PyColumnView {
997+
PyColumnView {
998+
_inner: ColumnView::from_blob_bytes::<
999+
Vec<u8>,
1000+
Option<Vec<u8>>,
1001+
std::vec::IntoIter<Option<Vec<u8>>>,
1002+
Vec<Option<Vec<u8>>>,
1003+
>(values),
1004+
}
1005+
}
1006+
9941007
#[pymodule]
9951008
fn taosws(py: Python<'_>, m: &PyModule) -> PyResult<()> {
9961009
if std::env::var("RUST_LOG").is_ok() {
@@ -1051,6 +1064,7 @@ fn taosws(py: Python<'_>, m: &PyModule) -> PyResult<()> {
10511064
m.add_function(wrap_pyfunction!(binary_to_column, m)?)?;
10521065
m.add_function(wrap_pyfunction!(varbinary_to_column, m)?)?;
10531066
m.add_function(wrap_pyfunction!(geometry_to_column, m)?)?;
1067+
m.add_function(wrap_pyfunction!(blob_to_column, m)?)?;
10541068
m.add_function(wrap_pyfunction!(stmt2_bind_param_view, m)?)?;
10551069

10561070
m.add("apilevel", API_LEVEL)?;

taos-ws-py/tests/test_blob.py

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import datetime
2+
import time
3+
import taosws
4+
from taosws import Consumer
5+
import os
6+
7+
8+
def test_blob_sql():
9+
value = os.getenv("TEST_TD_3360")
10+
if value is not None:
11+
return
12+
13+
conn = taosws.connect("ws://localhost:6041")
14+
cursor = conn.cursor()
15+
16+
try:
17+
cursor.execute("drop database if exists test_1753269319")
18+
cursor.execute("create database test_1753269319")
19+
cursor.execute("use test_1753269319")
20+
cursor.execute("create table t0(ts timestamp, c1 blob)")
21+
cursor.execute("insert into t0 values(1752218982761, null)")
22+
cursor.execute("insert into t0 values(1752218982762, '')")
23+
cursor.execute("insert into t0 values(1752218982763, 'hello')")
24+
cursor.execute("insert into t0 values(1752218982764, '\\x12345678')")
25+
26+
cursor.execute("select * from t0")
27+
rows = cursor.fetchall()
28+
29+
assert len(rows) == 4
30+
31+
assert rows[0][0].timestamp() * 1000 == 1752218982761
32+
assert rows[1][0].timestamp() * 1000 == 1752218982762
33+
assert rows[2][0].timestamp() * 1000 == 1752218982763
34+
assert rows[3][0].timestamp() * 1000 == 1752218982764
35+
36+
assert rows[0][1] is None
37+
assert rows[1][1] == b""
38+
assert rows[2][1] == b"hello"
39+
assert rows[3][1] == b"\x124Vx"
40+
41+
finally:
42+
cursor.execute("drop database test_1753269319")
43+
conn.close()
44+
45+
46+
def test_blob_stmt2():
47+
value = os.getenv("TEST_TD_3360")
48+
if value is not None:
49+
return
50+
51+
conn = taosws.connect("ws://localhost:6041")
52+
try:
53+
conn.execute("drop database if exists test_1753269333")
54+
conn.execute("create database test_1753269333")
55+
conn.execute("use test_1753269333")
56+
conn.execute("create table t0 (ts timestamp, c1 blob)")
57+
58+
test_timestamps = [1726803356466, 1726803356467, 1726803356468, 1726803356469]
59+
test_blobs = [None, b"", b"hello", b"\x124Vx"]
60+
61+
stmt2 = conn.stmt2_statement()
62+
stmt2.prepare("insert into t0 values (?, ?)")
63+
64+
param = taosws.stmt2_bind_param_view(
65+
table_name="",
66+
tags=None,
67+
columns=[
68+
taosws.millis_timestamps_to_column(test_timestamps),
69+
taosws.blob_to_column(test_blobs),
70+
],
71+
)
72+
stmt2.bind([param])
73+
74+
affected_rows = stmt2.execute()
75+
assert affected_rows == 4
76+
77+
stmt2.prepare("select * from t0 where ts > ?")
78+
79+
param = taosws.stmt2_bind_param_view(
80+
table_name="",
81+
tags=None,
82+
columns=[taosws.millis_timestamps_to_column([1726803356465])],
83+
)
84+
stmt2.bind([param])
85+
stmt2.execute()
86+
87+
result = stmt2.result_set()
88+
expected_results = list(zip(test_timestamps, test_blobs))
89+
90+
actual_results = []
91+
for row in result:
92+
dt = datetime.datetime.strptime(row[0], "%Y-%m-%d %H:%M:%S.%f %z")
93+
timestamp = int(dt.timestamp() * 1000)
94+
actual_results.append((timestamp, row[1]))
95+
96+
assert actual_results == expected_results
97+
98+
finally:
99+
conn.execute("drop database test_1753269333")
100+
conn.close()
101+
102+
103+
def test_blob_tmq():
104+
value = os.getenv("TEST_TD_3360")
105+
if value is not None:
106+
return
107+
108+
conn = taosws.connect("ws://localhost:6041")
109+
try:
110+
conn.execute("drop topic if exists topic_1753270984")
111+
conn.execute("drop database if exists test_1753270984")
112+
conn.execute("create database test_1753270984")
113+
conn.execute("create topic topic_1753270984 as database test_1753270984")
114+
conn.execute("use test_1753270984")
115+
conn.execute("create table t0 (ts timestamp, c1 int, c2 blob)")
116+
117+
num = 100
118+
119+
sql = "insert into t0 values "
120+
for i in range(num):
121+
ts = 1726803356466 + i
122+
sql += f"({ts}, {i}, 'blob_{i}'), "
123+
conn.execute(sql)
124+
125+
consumer = Consumer(dsn="ws://localhost:6041?group.id=10&auto.offset.reset=earliest")
126+
consumer.subscribe(["topic_1753270984"])
127+
128+
cnt = 0
129+
130+
while 1:
131+
message = consumer.poll(timeout=5.0)
132+
if message:
133+
for block in message:
134+
cnt += block.nrows()
135+
consumer.commit(message)
136+
else:
137+
break
138+
139+
assert cnt == num
140+
141+
consumer.unsubscribe()
142+
143+
finally:
144+
time.sleep(3)
145+
conn.execute("drop topic topic_1753270984")
146+
conn.execute("drop database test_1753270984")
147+
conn.close()

taos-ws-py/tests/test_commit_offset.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ def prepare(topic_name):
3737
"create table `table` (ts timestamp, v int)",
3838
]
3939
for statement in statements:
40-
# print(statement)
4140
cursor.execute(statement)
4241

4342

@@ -50,7 +49,6 @@ def clear(topic_name):
5049
f"drop database if exists {dbname}",
5150
]
5251
for statement in statements:
53-
# print(statement)
5452
cursor.execute(statement)
5553

5654

0 commit comments

Comments
 (0)