校园春色亚洲色图_亚洲视频分类_中文字幕精品一区二区精品_麻豆一区区三区四区产品精品蜜桃

主頁 > 知識庫 > MLSQL Stack如何讓流調試更加簡單詳解

MLSQL Stack如何讓流調試更加簡單詳解

熱門標簽:昌德訊外呼系統 中國地圖標注公司 百度地圖標注要什么軟件 天津公司外呼系統軟件 400電話申請廠家現貨 福建外呼電銷機器人加盟 電話機器人的價格多少錢一個月 自己做地圖標注需要些什么 徐涇鎮騰訊地圖標注

前言

有一位同學正在調研MLSQL Stack對流的支持。然后說了流調試其實挺困難的。經過實踐,希望實現如下三點:

  • 能隨時查看最新固定條數的Kafka數據
  • 調試結果(sink)能打印在web控制臺
  • 流程序能自動推測json schema(現在spark是不行的)

實現這三個點之后,我發現調試確實就變得簡單很多了。

流程

首先我新建了一個kaf_write.mlsql,里面方便我往Kafka里寫數據:

set abc='''
{ "x": 100, "y": 200, "z": 200 ,"dataType":"A group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
''';
load jsonStr.`abc` as table1;

select to_json(struct(*)) as value from table1 as table2;
save append table2 as kafka.`wow` where 
kafka.bootstrap.servers="127.0.0.1:9092";

這樣我每次運行,數據就能寫入到Kafka.

接著,我寫完后,需要看看數據是不是真的都寫進去了,寫成了什么樣子:

!kafkaTool sampleData 10 records from "127.0.0.1:9092" wow;

這句話表示,我要采樣Kafka 10條Kafka數據,該Kafka的地址為127.0.0.1:9092,主題為wow.運行結果如下:

沒有什么問題。接著我寫一個非常簡單的流式程序:

-- the stream name, should be uniq.
set streamName="streamExample";

-- use kafkaTool to infer schema from kafka
!kafkaTool registerSchema 2 records from "127.0.0.1:9092" wow;


load kafka.`wow` options 
kafka.bootstrap.servers="127.0.0.1:9092"
as newkafkatable1;


select * from newkafkatable1
as table21;


-- print in webConsole instead of terminal console.
save append table21 
as webConsole.`` 
options mode="Append"
and duration="15"
and checkpointLocation="/tmp/s-cpl4";

運行結果如下:

在終端我們也可以看到實時效果了。

補充

當然,MLSQL Stack 還有對流還有兩個特別好地方,第一個是你可以對流的事件設置http協議的callback,以及對流的處理結果再使用批SQL進行處理,最后入庫。參看如下腳本:

-- the stream name, should be uniq.
set streamName="streamExample";


-- mock some data.
set data='''
{"key":"yes","value":"no","topic":"test","partition":0,"offset":0,"timestamp":"2008-01-24 18:01:01.001","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":1,"timestamp":"2008-01-24 18:01:01.002","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":2,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":3,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":4,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":5,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
''';

-- load data as table
load jsonStr.`data` as datasource;

-- convert table as stream source
load mockStream.`datasource` options 
stepSizeRange="0-3"
as newkafkatable1;

-- aggregation 
select cast(value as string) as k from newkafkatable1
as table21;


!callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated";
-- output the the result to console.


save append table21 
as custom.`` 
options mode="append"
and duration="15"
and sourceTable="jack"
and code='''
select count(*) as c from jack as newjack;
save append newjack as parquet.`/tmp/jack`; 
'''
and checkpointLocation="/tmp/cpl15";

總結

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對腳本之家的支持。

您可能感興趣的文章:
  • Mysql LONGBLOB 類型存儲二進制數據 (修改+調試+整理)
  • Mysql LONGTEXT 類型存儲大文件(二進制也可以) (修改+調試+整理)
  • Mysql 插入中文及中文查詢 (修改+調試)
  • 新手配置 PHP 調試環境(IIS+PHP+MYSQL)
  • MySQL UDF調試方式debugview的相關方法
  • 分享101個MySQL調試與優化技巧
  • GDB調試Mysql實戰之源碼編譯安裝

標簽:駐馬店 昌都 北京 黔西 梅河口 鄂爾多斯 荊門 陜西

巨人網絡通訊聲明:本文標題《MLSQL Stack如何讓流調試更加簡單詳解》,本文關鍵詞  MLSQL,Stack,如何,讓,流,調試,;如發現本文內容存在版權問題,煩請提供相關信息告之我們,我們將及時溝通與處理。本站內容系統采集于網絡,涉及言論、版權與本站無關。
  • 相關文章
  • 下面列出與本文章《MLSQL Stack如何讓流調試更加簡單詳解》相關的同類信息!
  • 本頁收集關于MLSQL Stack如何讓流調試更加簡單詳解的相關信息資訊供網民參考!
  • 推薦文章
    主站蜘蛛池模板: 翁牛特旗| 神农架林区| 通江县| 东乡族自治县| 兴城市| 深水埗区| 威信县| 新巴尔虎右旗| 仙游县| 梁河县| 新巴尔虎左旗| 浦北县| 馆陶县| 金沙县| 靖宇县| 和林格尔县| 山阳县| 阳城县| 大埔区| 聂荣县| 敦煌市| 稻城县| 辽源市| 广平县| 汨罗市| 紫云| 长治县| 泸州市| 无极县| 玉环县| 南安市| 万源市| 云霄县| 桐城市| 崇仁县| 鄄城县| 铜陵市| 潜山县| 酒泉市| 灵川县| 黎城县|