Press "Enter" to skip to content

QStreaming:轻量级大数据 ETL 开发框架

本站内容均来自兴趣收集,如不慎侵害的您的相关权益,请留言告知,我们将尽快删除.谢谢.

QStreaming is a framework that simplifies writing and executing ETLs on top of Apache Spark

 

It is based on a simple sql-like configuration file and runs on any Spark cluster

 

Architecture

 

 

QStreaming is built on top of Apache Spark and is mainly made of following components:

 

Pipeline DSL

A configuration file defines the queries of the ETL Pipeline, it’s made of by the input tables, metric statements, data quality check rules (optional ) and output tables

 

Pipeline DSL Parser

A parser parsing the Pipeline DSL using Antlr parser generator and build the pipeline domain models

 

Pipeline translater

A translater translate the pipeline generated by Pipeline DSL parser into spark transformations

 

Data Quality Checker

Data quality checker is use to verify/measure intermediate or final dataset according to the data quality check rules which defined in Pipeline DSL file

 

Pipeline Runner

Pipeline Runner scheduling and run the pipeline as a spark batch/streaming application

 

Getting started

 

To run QStreaming you must first define 2 files.

 

Pipeline DSL

 

For example a simple pipeline dsl file should be as follows:

 

-- DDL for streaming input which connect to a kafka topic
-- this declares five fields based on the JSON data format.In addition, it use the ROWTIME() to declare a virtual column that generate the event time attribute from existing ts field
create stream input table user_behavior(
  user_id LONG,
  item_id LONG,
  category_id LONG,
  behavior STRING,
  ts TIMESTAMP,
  eventTime as ROWTIME(ts,'1 minutes')
) using kafka(
  kafka.bootstrap.servers="localhost:localhost:9091",
  startingOffsets=earliest,
  subscribe="user_behavior",
  "group-id"="user_behavior"
);
-- DDL for streaming output which connect to a kafka topic
create stream output table behavior_cnt_per_hour
using kafka(
  kafka.bootstrap.servers="localhost:9091",
  topic="behavior_cnt_per_hour"
)TBLPROPERTIES(
  "-"="update",
  checkpointLocation = "behavior_cnt_per_hour"
);
-- CREATE VIEW count the number of "buy" records in each hour window.
create view v_behavior_cnt_per_hour as
SELECT
   window(eventTime, "1 minutes") as window,
   COUNT(*) as behavior_cnt,
   behavior
FROM user_behavior
GROUP BY
  window(eventTime, "1 minutes"),
  behavior;

--  persist result to kafka
insert into behavior_cnt_per_hour
select
   from_unixtime(cast(window.start as LONG)/1000,'yyyy-MM-dd HH:mm') as time,
   behavior_cnt,
   behavior
from
  v_behavior_cnt_per_hour;

 

Application configuration properties

 

There are only two config options currently avaliable.

 

Run QStreaming

 

There are three options to run QStreaming, first to get the latest released JAR from here

 

Run on a yarn cluster

 

To run on a cluster requires Apache Spark v2.2+

Run the following command:

$SPARK_HOME/bin/spark-submit
--class com.qiniu.stream.core.Streaming \
--master yarn \
--deploy-mode client \
stream-standalone-0.0.3-jar-with-dependencies.jar  \
-j pipeline.dsl

 

Run on a standalone cluster

 

To run on a standalone cluster you must first start a spark standalone cluster , and then run the following command:

 

$SPARK_HOME/bin/spark-submit
--class com.qiniu.stream.core.Streaming \
--master spark://IP:PORT \
stream-standalone-0.0.3-jar-with-dependencies.jar \
-j pipeline.dsl

 

Run as a library

 

It’s also possible to use QStreaming inside your own project

 

To use it adds the dependency to your project

 

maven

<dependency>
  <groupId>com.qiniu</groupId>
  <dependency>stream-core</dependency>
  <version>0.0.3</version>
</dependency>

 

gradle

compile 'com.qiniu:stream-core:0.0.3'

 

sbt

libraryDependencies += "com.qiniu" % "stream-core" % "0.0.3"

 

Datasources

 

we support following datasource as input:

Kafka (streaming) with json/regex/csv/avro format
HDFS/S3 with csv/json/text/parquet/avro storage format
Jdbc datasource
MongoDB
Apache Hbase

and following datasources as output:

Kafka
Elasticsearch
Apache Hbase
MongoDB
Jdbc datasource
HDFS/S3 with csv/json/text/parquet/avro storage format

Features

 

DDL Support for streaming process

 

create stream input table user_behavior(
  user_id LONG,
  item_id LONG,
  category_id LONG,
  behavior STRING,
  ts TIMESTAMP,
  eventTime as ROWTIME(ts,'1 minutes')
) using kafka(
  kafka.bootstrap.servers="localhost:9091",
  startingOffsets=earliest,
  subscribe="user_behavior",
  "group-id"="user_behavior"
);

 

Above DDL statement define an input which connect to a kafka topic.

 

For detail information please refer to CreateSourceTableStatement for how to define an input and CreateSinkTableStatement for how to define an output.

 

Watermark support in sql

 

QStreaming supports watermark which helps a stream processing engine to deal with late data.

 

There are two ways to use watermark for a stream processing engine

 

Adding ROWTIME(eventTimeField,delayThreshold) as a schema property in a ddl statement

create stream input table user_behavior(
  user_id LONG,
  item_id LONG,
  category_id LONG,
  behavior STRING,
  ts TIMESTAMP,
  eventTime as ROWTIME(ts,'1 minutes')
) using kafka(
  kafka.bootstrap.servers="localhost:9091",
  startingOffsets=earliest,
  subscribe="user_behavior",
  "group-id"="user_behavior"
);

Above example means use eventTime as event time field with 5 minutes delay thresholds

 

Adding waterMark(“eventTimeField, delayThreshold”) as a view property in a view statement

create view v_behavior_cnt_per_hour(waterMark = "eventTime, 1 minutes") as
SELECT
   window(eventTime, "1 minutes") as window,
   COUNT(*) as behavior_cnt,
   behavior
FROM user_behavior
GROUP BY
  window(eventTime, "1 minutes"),
  behavior;

 

Above example define a watermark use eventTime field with 1 minute threshold

 

Dynamic user define function

 

-- define UDF named hello
def hello(name:String) = {
   s"hello ${name}"
};

 

QStreaming allow to define a dynamic UDF inside job.dsl, for more detail information please refer to createFunctionStatement

 

Above example define UDF with a string parameter.

 

The multiple sink for streaming application

 

create stream output table output using hbase(
        quorum = 'test1:2181,test2:2181,test3:2181',
        tableName = 'buy_cnt_per_hour',
        rowKey = '<hour_of_day>',
        cf = 'cf',
        fields = '[{"qualified":"buy_cnt","value":"behavior_cnt","type":"LongType"}]',
        where = 'behavior="buy"'
    ),hbase(
        quorum = 'test1:2181,test2:2181,test3:2181',
        tableName = 'order_cnt_per_hour
        rowKey = '<hour_of_day>',
        cf = 'cf',
        fields = '[{"qualified":"order_cnt","value":"behavior_cnt","type":"LongType"}]',
        where = 'behavior="order"'
    ) TBLPROPERTIES (outputMode = update,checkpointLocation = "behavior_output");

 

QStreaming allow you to define multiple output for streaming/batch process engine by leavarage foreEachBatch mode (only avaliable in spark>=2.4.0)

 

Above example will sink the behavior count metric to two hbase table, for more information about how to create multiple sink please refer to createSinkTableStatement

 

Variable interpolation

 

create batch input table raw_log
USING parquet(path="hdfs://cluster1/logs/day=<day>/hour=<hour>");

 

job.dsl file support variable interpolation from command line arguments , this is useful for running QStreaming as a periodic job.

 

For example, you can pass the value for theDayThatRunAJob and theHourThatRunAJob from an Airflow DAG

 

$SPARK_HOME/bin/spark-submit
--name {{.dir}} \
--class com.qiniu.stream.core.Streaming \
--master yarn \
--deploy-mode client \
--conf spark.executor.extraClassPath=./ \
stream-standalone-0.0.3-jar-with-dependencies.jar \
-j pipeline.dsl \
-c stream.template.vars.day=theDayThatRunAJob \
-c stream.template.vars.hour=theHourThatRunAJob

 

Kafka lag monitor

 

QStreaming allow to monitor the kafka topic offset lag by adding the “group-id” connector property in ddl statement as below

 

create stream input table user_behavior(
  user_id LONG,
  item_id LONG,
  category_id LONG,
  behavior STRING,
  ts TIMESTAMP,
  eventTime as ROWTIME(ts,'1 minutes')
) using kafka(
  kafka.bootstrap.servers="localhost:9091",
  startingOffsets=earliest,
  subscribe="user_behavior",
  "group-id"="user_behavior"
);

 

Data Quality Check

 

The purpose is to “unit-test” data to find errors early, before the data gets fed to any storage.

 

For example, we test for the following properties of data :

 

id
productName
priority
numViews
description
numViews

 

In DSL this looks as follows:

 

CREATE TEST testName(testLevel=Error,testOutput=testResult) on dataset WITH 
   numRows()=5 and 
   isNotNull(id) and 
   isUnique(id) and 
   isComplete(productName) and 
   isContainedIn(priority, ["high", "low"]) and 
   isNonNegative(numViews)  and 
   containsUrl(description) >= 0.5 and 
   hasApproxQuantile(numViews, 0.5) <= 10

 

Contributing

 

We welcome all kinds of contribution, including bug reports, feature requests, documentation improvements, UI refinements, etc.

 

Thanks to all contributors !!

 

License

 

See the LICENSE file for license rights and limitations (Apache License).

 

Join QStreaming WeChat Group

 

Join Gitter room

 

Join We-Chat Group

 

Be First to Comment

发表评论

电子邮件地址不会被公开。 必填项已用*标注