Apache Griffin debugging process bug record

Apache Griffin debugging process bug record

1. The database reports an error Table'quartz.DATACONNECTOR' doesn't exist

2021 -01 -18 14 : 54 is : 54.135 ERROR 122 541 --- [ HTTP - NiO - 8081 - Exec - . 8 ] oaccC [ . [ . [ . [ DispatcherServlet ] [ 175 ]: Servlet.service () for the servlet [ DispatcherServlet ] in context with path [] threw exception [ Request processing failed; nested exception is org.springframework.transaction.TransactionSystemException : Could not commit JPA transaction ; nested exception is javax.persistence.RollbackException : Exception [ EclipseLink - 4002 ] ( Eclipse Persistence Services - 2.6 .0.v20150309 - bf26070 ): org.eclipse .persistence.exceptions.DatabaseException Int ernal Exception :com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException :the Table 'quartz.DATACONNECTOR' doesn 'T exist Error Code: 1146 Call: INSERT INTO DATACONNECTOR (ID, CONFIG, CREATEDDATE, DATAFRAMENAME, DATATIMEZONE, DATAUNIT, MODIFIEDDATE, NAME, TYPE, VERSION) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) bind => [7, {"database":"griffin_demo","table.name":"demo_tgt","where":"dt=#YYYYMMdd# AND hour=#HH#"}, 1610952894112, null, GMT+ 8, 1hour, null, target1610952607162, HIVE, 1.2] Query: InsertObjectQuery(DataConnector{name=target1610952607162type=HIVE, version=' 1.2 ', config={"database":"griffin_demo","table.name":"demo_tgt","where":"dt=#YYYYMMdd# AND hour=#HH#"}})] with root cause com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: the Table ' quartz.DATACONNECTOR ' doesn ' T exist duplicated code

This problem depends on the specific analysis, but the high probability is that the field to be inserted is too long, and the length of the database limit exceeds the length of the database, which causes the JPA to fail to create the table, which makes it impossible to insert the table later.

@JsonIgnore @Transient private String defaultDataUnit = "365000d" ; //Add columnDefinition = "TEXT" @JsonIgnore @Column(length = 20480,columnDefinition = "TEXT") private String config; @Transient private Map<String, Object> configMap; Copy code

Among them, the path of DataConnector is in ./service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java

2. There is no main manifest attribute in spring-boot-01-helloworld-1.0-SNAPSHOT.jar

When using maven to compile and install the griffin source code, it may appear that the jar package does not have the main list attribute. You can add a SpringBoot build plug-in to the pom, and then re-run mvn install.

<build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build> Copy code

In fact, there is this plug-in in version griffin-0.7.0, just change it to repackage in goal, remember to clean and then install to recompile.

3.Livy reports an error Could not find Livy jars directory

This is most likely because the package is wrong. It is not true to download the incubating-livy package from the official website. There is no jar package in it. Download and install livy-server-0.xxjar.

4.MySQL reports an error mysql Failed to open file'xxx.sql', error

The paths in mysql are queried in the form of relative paths under Linux. For example, if we open MySQL in/usr/local/tomcat, then all the paths inside are in the/usr/local/tomcat directory. Relative path query, such as the source/sqlfile/xxx.sql; we wrote before. From the perspective of MySQL, the command we give is to let it find the/usr/local/tomcat/sqlfile/xxx.sql file. So when we want to use mysql to open a certain sql file, we need to cd to the relevant absolute path in advance, then open MySQL and enter the command source xxx.sql;

5.Spark startup error java.net.ConnectException: Call From xxx to xxx:8020 to failed on connection exception: Connection refused

1. Make sure that the firewall is turned off, otherwise many ports you configured will not be used

systemctl stop firewalld systemctl status firewalld systemctl disable firewalld Copy code

2. Make sure that the hosts file has only the cluster server ip address and host name mapping, otherwise you have to write the full name ip every time, not the host name

Vim/etc/the hosts 192.168 . 239.131 Hadoop101 192.168 . 239.132 Hadoop102 192.168 . 239.133 Hadoop103 duplicated code

3. Take a look at the $HADOOP_HOME/etc/hadoop/core-site.xml or hdfs-site.xml configuration file in the hadoop directory. It is necessary to clarify the namenode node name and port number of the host hdfs, especially the port number, whether it is the same as spark/The port number configured by spark.eventlog.dir in conf/spark-defaults.conf and the port number of -Dspark.history.fs.logDirectory configured in spark/conf/spark-env.sh are the same. For example, 8020 and 9000 are definitely correct Not on.

-Dspark .history.fs.logDirectory=hdfs://hadoop101: 9000/spark_directory " Copy code
HDFS spark.eventLog.dir://hadoop101: 9000/spark_directory copy the code
<!-- Specify the address of the NameNode in HDFS --> <property> <name>fs.defaultFS</name> <!-- Among them, hdfs is the protocol name, hadoop101 is the host name of the node server of the NameNode, and 9000 is the >port--> <value>hdfs://hadoop101: 9000 </value> </property> Copy code

6. | xargs kill always seems to not kill the process

When building a big data cluster, it is often necessary to write shell scripts to turn on or turn off certain processes. When I write a flume script, I can t always turn off the flume consumption process, which may be related to the early shutdown of Kafka behind it, no matter what , Can be batched | xargs kill is followed by a -9 to forcibly kill the process

"stop" ){ for i in hadoop103 do echo "--------Stop $i consumption flume-------" ssh $i "ps -ef | grep kafka-flume-hdfs | grep- v grep |awk'{print/$2 }'| xargs kill -9" done };; Copy code

Briefly explain the meaning of the words and sentences, ps -ef checks all processes, grep filters according to the character conditions provided, adding a -v means the opposite, awk {print $2} is to print the string in the second position, in fact, it is printing After the process ID, xargs kill is a command to kill processes in batches, similar to kill -9, which kills a process.

7.elasticsearch5.2 startup error

Java HotSpot (TM) 64-Bit Server VM warning: INFO: os:: commit_memory ( 0x00000000c5330000 , 986513408 , 0 ) failed ; error = 'Cannot allocate memory' (errno= 12 ) # # There is insufficient memory for the Java Runtime Environment to continue . # Native memory allocation (mmap) failed to map 986513408 bytes for committing reserved memory. # An error report file with more information is saved as: #/usr/local/elasticsearch/hs_err_pid1675.log Copy code

Since elasticsearch5.2 allocates 1g of jvm space by default, my virtual machine memory is not large enough, modify the jvm space allocation

# vim config/jvm.options -Xms1g -Xmx1g Copy code

changed to

-Xms512m -Xmx512m Copy code

Spark also has the option of configuring jvm memory. After all, it is a memory-based computing engine. It must eat memory when it runs. Under the test conditions, the running memory of spark can be reduced (modify spark.driver.memory in spark/conf/spark-default.conf, such as 512m), otherwise an error will be reported. But it is better to give a larger memory to the virtual machine cluster, especially the host that runs the namanode node of Hadoop, because a large amount of metadata needs to be stored, and if you need to do calculation tasks, you can give more memory.

8. Wrong scala version

Since the underlying code of griffin is written in scala, and the underlying spark of spark is also based on scala, when you run spark -submit to perform griffin data quality inspection, because it will automatically call the scala that comes with spack, the pom file with griffin may appear The specified scala version is inconsistent. This is often overlooked and made mistakes. This is also mentioned on the griffin official website. Since the parent pom in the latest griffin-0.7.0 specifies the 2.11 version of scala, it is recommended to install spark2. 3 or 2.4, don't use spark3.0, the latter comes with scala 2.12, it will run wrong. Scala.binary.verson item in pom.xml

<properties> <encoding>UTF- 8 </encoding> <project.build.sourceEncoding>${encoding}</project.build.sourceEncoding> <project.reporting.outputEncoding>${encoding}</project.reporting.outputEncoding> < java.version> 1.8 </java.version> <scala.binary.version> 2.11 </scala.binary.version> <scala211.binary.version> 2.11 </scala211.binary.version> <scala.version>${scala.binary.version} .0 </scala.version> Copy code

9.java.lang.AssertionError: assertion failed: Connector is undefined or invalid

21 is/06/28 18 is : 54 is : 28 ERROR measure.Application $: the assertion failed: Connector IS undefined or invalid java.lang.AssertionError: assertion failed: Connector is undefined or invalid at scala.Predef$. assert (Predef.scala: 170 ) at org.apache.griffin.measure.configuration.dqdefinition.DataSourceParam.validate(DQConfig.scala: 100 ) at org.apache.griffin.measure.configuration.dqdefinition.DQConfig$$anonfun$validate$ 5. apply(DQConfig.scala: 74 ) at org.apache.griffin.measure.configuration.dqdefinition.DQConfig$$anonfun$validate$ 5. apply(DQConfig.scala: 74 ) at scala.collection.immutable.List.foreach(List.scala: 392 ) at org.apache.griffin.measure.configuration.dqdefinition.DQConfig.validate(DQConfig.scala: 74 ) at org.apache.griffin.measure.configuration.dqdefinition.reader.ParamReader$class.validate(ParamReader.scala: 43 ) at org.apache.griffin.measure.configuration.dqdefinition.reader.ParamFileReader.validate(ParamFileReader.scala: 33 ) at org.apache.griffin.measure.configuration.dqdefinition.reader.ParamFileReader$$anonfun$readConfig$ 1. apply(ParamFileReader.scala: 40 ) at org.apache.griffin.measure.configuration.dqdefinition.reader.ParamFileReader$$anonfun$readConfig$ 1. apply(ParamFileReader.scala: 36 ) at scala.util.Try$.apply(Try.scala: 192 ) at org.apache.griffin.measure.configuration.dqdefinition.reader.ParamFileReader.readConfig(ParamFileReader.scala: 36 ) at org.apache.griffin.measure.Application$.readParamFile(Application.scala: 127 ) at org.apache.griffin.measure.Application$.main(Application.scala: 61 ) at org.apache.griffin.measure.Application.main(Application.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: 62 ) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java: 43 ) at java.lang.reflect.Method.invoke(Method.java: 498 ) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala: 52 ) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala: 849 ) at org.apache.spark.deploy.SparkSubmit.doRunMain$ 1 (SparkSubmit.scala: 167 ) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala: 195 ) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala: 86 ) at org.apache.spark.deploy.SparkSubmit$$anon$ 2. doSubmit(SparkSubmit.scala: 924 ) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala: 933 ) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 21 is/06/28 18 is : 54 is : 28 the INFO util.ShutdownHookManager: Called the Shutdown Hook 21 is/06/28 18 is : 54 is : 28 the INFO util.ShutdownHookManager: the Deleting Directory/tmp/Spark-f62d0e16- 1189 -49ce-9c4c-565ff330dfb8 replication Code

In the dq.json on the official website, the data.source connector field is written as "connectors", with s added, and the "[ ]" symbol is naturally added afterwards, representing multiple connects, but sometimes he will report an error, meaning It is Object[].class that needs to be converted to Object.class. Obviously, it only accepts one connector. When configuring this block, you should pay attention to it. If an error is reported, you need to remove the s, write it as "connector", and remove the "[]" , That is, only one connetor can be configured.

"process.type" : "batch" , "data.sources" : [ { "name" : "src" , "baseline" : true , "connector" : { "type" : "hive" , "version" : "3.1" , "config" : { "database" : "default" , "table.name" : " demo_src " } } }, Copy code

10. Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found

Error message:

Caused by: java.lang.RuntimeException: Error in configuring object at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java: 112 ) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java: 78 ) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java: 136 ) at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala: 190 ) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala: 204 ) at org.apache.spark.rdd.RDD$$anonfun$partitions$ 2. apply(RDD.scala: 253 ) at org.apache.spark.rdd.RDD$$anonfun$partitions$ 2. apply(RDD.scala: 251 ) at scala.Option.getOrElse(Option.scala: 121 ) at org.apache.spark.rdd.RDD.partitions(RDD.scala: 251 ) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala: 49 ) at org.apache.spark.rdd.RDD$$anonfun$partitions$ 2. apply(RDD.scala: 253 ) at org.apache.spark.rdd.RDD$$anonfun$partitions$ 2. apply(RDD.scala: 251 ) at scala.Option.getOrElse(Option.scala: 121 ) at org.apache.spark.rdd.RDD.partitions(RDD.scala: 251 ) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala: 49 ) at org.apache.spark.rdd.RDD$$anonfun$partitions$ 2. apply(RDD.scala: 253 ) at org.apache.spark.rdd.RDD$$anonfun$partitions$ 2. apply(RDD.scala: 251 ) at scala.Option.getOrElse(Option.scala: 121 ) at org.apache.spark.rdd.RDD.partitions(RDD.scala: 251 ) at org.apache.spark.rdd.UnionRDD$$anonfun$ 1. apply(UnionRDD.scala: 84 ) at org.apache.spark.rdd.UnionRDD$$anonfun$ 1. apply(UnionRDD.scala: 84 ) at scala.collection.TraversableLike$$anonfun$map$ 1. apply(TraversableLike.scala: 234 ) at scala.collection.TraversableLike$$anonfun$map$ 1. apply(TraversableLike.scala: 234 ) at scala.collection.immutable.List.foreach(List.scala: 392 ) at scala.collection.TraversableLike$class.map(TraversableLike.scala: 234 ) at scala.collection.immutable.List.map(List.scala: 296 ) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala: 84 ) at org.apache.spark.rdd.RDD$$anonfun$partitions$ 2. apply(RDD.scala: 253 ) at org.apache.spark.rdd.RDD$$anonfun$partitions$ 2. apply(RDD.scala: 251 ) at scala.Option.getOrElse(Option.scala: 121 ) at org.apache.spark.rdd.RDD.partitions(RDD.scala: 251 ) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala: 49 ) at org.apache.spark.rdd.RDD$$anonfun$partitions$ 2. apply(RDD.scala: 253 ) at org.apache.spark.rdd.RDD$$anonfun$partitions$ 2. apply(RDD.scala: 251 ) at scala.Option.getOrElse(Option.scala: 121 ) at org.apache.spark.rdd.RDD.partitions(RDD.scala: 251 ) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala: 49 ) at org.apache.spark.rdd.RDD$$anonfun$partitions$ 2. apply(RDD.scala: 253 ) at org.apache.spark.rdd.RDD$$anonfun$partitions$ 2. apply(RDD.scala: 251 ) at scala.Option.getOrElse(Option.scala: 121 ) at org.apache.spark.rdd.RDD.partitions(RDD.scala: 251 ) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala: 49 ) at org.apache.spark.rdd.RDD$$anonfun$partitions$ 2. apply(RDD.scala: 253 ) at org.apache.spark.rdd.RDD$$anonfun$partitions$ 2. apply(RDD.scala: 251 ) at scala.Option.getOrElse(Option.scala: 121 ) at org.apache.spark.rdd.RDD.partitions(RDD.scala: 251 ) at org.apache.spark.ShuffleDependency.<init>(Dependency.scala: 94 ) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala: 321 ) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala: 91 ) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$ 1. apply(ShuffleExchangeExec.scala: 128 ) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$ 1. apply(ShuffleExchangeExec.scala: 119 ) at org.apache.spark.sql.catalyst.errors. package $.attachTree( package .scala: 52 ) ... 88 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: 62 ) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java: 43 ) at java.lang.reflect.Method.invoke(Method.java: 498 ) at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java: 109 ) ... 139 more Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found. at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java: 139 ) at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java: 180 ) at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java: 45 ) ... 144 more Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java: 2101 ) at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java: 132 ) ... 146 More Copy the code

The easiest way is to put the jar package of hadoop-lzo-0.4.20.jar into spark jars.

11. ERROR curator.ConnectionState: Connection timed out for connection string (zk:2181) and timeout (15000)/elapsed (47080)

21 is/06/28 . 19 : 00 : 24 WARN curator.ConnectionState: Connection After unsuccessful attempt 67036 (Greater Within last max timeout of 60000 ) and the Resetting Connection Trying Again with A. New new Connection. 21 is/06/28 . 19 : 00 : 24 ERROR offset.OffsetCheckpointInZK: delete/lock error: zk : unknown name or service 21/06/28 19 : 00 : 39 eRROR curator.ConnectionState:Connection timed out for connection string (zk: 2181 ) and timeout ( 15000 )/elapsed ( 15059 ) org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss at org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java: 197 ) at org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java: 87 ) at org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java: 115 ) at org.apache.curator.utils.EnsurePath$InitialHelper$ 1. call(EnsurePath.java: 148 ) at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java: 107 ) at org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java: 141 ) at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java: 99 ) at org.apache.curator.framework.imps.NamespaceImpl.fixForNamespace(NamespaceImpl.java: 74 ) at org.apache.curator.framework.imps.CuratorFrameworkImpl.fixForNamespace(CuratorFrameworkImpl.java: 574 ) at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java: 194 ) at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java: 41 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointInZK.org$apache$griffin$measure$context$streaming$checkpoint$offset$OffsetCheckpointInZK$$delete(OffsetCheckpointInZK.scala: 204 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointInZK$$anonfun$delete$ 1. apply(OffsetCheckpointInZK.scala: 124 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointInZK$$anonfun$delete$ 1. apply(OffsetCheckpointInZK.scala: 123 ) at scala.collection.immutable.List.foreach(List.scala: 392 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointInZK.delete(OffsetCheckpointInZK.scala: 123 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointInZK.clear(OffsetCheckpointInZK.scala: 130 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointInZK.init(OffsetCheckpointInZK.scala: 90 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient$$anonfun$init$ 1. apply(OffsetCheckpointClient.scala: 34 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient$$anonfun$init$ 1. apply(OffsetCheckpointClient.scala: 34 ) at scala.collection.immutable.List.foreach(List.scala: 392 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient$.init(OffsetCheckpointClient.scala: 34 ) at org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$init$ 1. apply$mcV$sp(StreamingDQApp.scala: 70 ) at org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$init$ 1. apply(StreamingDQApp.scala: 55 ) at org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$init$ 1. apply(StreamingDQApp.scala: 55 ) at scala.util.Try$.apply(Try.scala: 192 ) at org.apache.griffin.measure.launch.streaming.StreamingDQApp.init(StreamingDQApp.scala: 55 ) at org.apache.griffin.measure.Application$.main(Application.scala: 82 ) at org.apache.griffin.measure.Application.main(Application.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: 62 ) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java: 43 ) at java.lang.reflect.Method.invoke(Method.java: 498 ) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala: 52 ) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala: 849 ) at org.apache.spark.deploy.SparkSubmit.doRunMain$ 1 (SparkSubmit.scala: 167 ) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala: 195 ) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala: 86 ) at org.apache.spark.deploy.SparkSubmit$$anon$ 2. doSubmit(SparkSubmit.scala: 924 ) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala: 933 ) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 21/06/28 19 : 00 : 55 ERROR curator.ConnectionState: Connection the Timed OUT for Connection String (ZK: 2181 ) and timeout ( of 15,000 )/the Elapsed ( 31072 ) org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss at org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java: 197 ) at org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java: 87 ) at org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java: 115 ) at org.apache.curator.utils.EnsurePath$InitialHelper$ 1. call(EnsurePath.java: 148 ) at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java: 107 ) at org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java: 141 ) at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java: 99 ) at org.apache.curator.framework.imps.NamespaceImpl.fixForNamespace(NamespaceImpl.java: 74 ) at org.apache.curator.framework.imps.CuratorFrameworkImpl.fixForNamespace(CuratorFrameworkImpl.java: 574 ) at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java: 194 ) at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java: 41 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointInZK.org$apache$griffin$measure$context$streaming$checkpoint$offset$OffsetCheckpointInZK$$delete(OffsetCheckpointInZK.scala: 204 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointInZK$$anonfun$delete$ 1. apply(OffsetCheckpointInZK.scala: 124 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointInZK$$anonfun$delete$ 1. apply(OffsetCheckpointInZK.scala: 123 ) at scala.collection.immutable.List.foreach(List.scala: 392 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointInZK.delete(OffsetCheckpointInZK.scala: 123 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointInZK.clear(OffsetCheckpointInZK.scala: 130 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointInZK.init(OffsetCheckpointInZK.scala: 90 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient$$anonfun$init$ 1. apply(OffsetCheckpointClient.scala: 34 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient$$anonfun$init$ 1. apply(OffsetCheckpointClient.scala: 34 ) at scala.collection.immutable.List.foreach(List.scala: 392 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient$.init(OffsetCheckpointClient.scala: 34 ) at org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$init$ 1. apply$mcV$sp(StreamingDQApp.scala: 70 ) at org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$init$ 1. apply(StreamingDQApp.scala: 55 ) at org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$init$ 1. apply(StreamingDQApp.scala: 55 ) at scala.util.Try$.apply(Try.scala: 192 ) at org.apache.griffin.measure.launch.streaming.StreamingDQApp.init(StreamingDQApp.scala: 55 ) at org.apache.griffin.measure.Application$.main(Application.scala: 82 ) at org.apache.griffin.measure.Application.main(Application.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: 62 ) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java: 43 ) at java.lang.reflect.Method.invoke(Method.java: 498 ) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala: 52 ) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala: 849 ) at org.apache.spark.deploy.SparkSubmit.doRunMain$ 1 (SparkSubmit.scala: 167 ) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala: 195 ) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala: 86 ) at org.apache.spark.deploy.SparkSubmit$$anon$ 2. doSubmit(SparkSubmit.scala: 924 ) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala: 933 ) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 21/06/28 19 : 01 : 11 ERROR curator.ConnectionState: Connection the Timed OUT for Connection String (ZK: 2181 ) and timeout ( of 15,000 )/the Elapsed ( 47080 ) org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss at org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java: 197 ) at org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java: 87 ) at org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java: 115 ) at org.apache.curator.utils.EnsurePath$InitialHelper$ 1. call(EnsurePath.java: 148 ) at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java: 107 ) at org.apache.curator.utils.EnsurePath$InitialHelper.ensure(EnsurePath.java: 141 ) at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java: 99 ) at org.apache.curator.framework.imps.NamespaceImpl.fixForNamespace(NamespaceImpl.java: 74 ) at org.apache.curator.framework.imps.CuratorFrameworkImpl.fixForNamespace(CuratorFrameworkImpl.java: 574 ) at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java: 194 ) at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java: 41 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointInZK.org$apache$griffin$measure$context$streaming$checkpoint$offset$OffsetCheckpointInZK$$delete(OffsetCheckpointInZK.scala: 204 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointInZK$$anonfun$delete$ 1. apply(OffsetCheckpointInZK.scala: 124 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointInZK$$anonfun$delete$ 1. apply(OffsetCheckpointInZK.scala: 123 ) at scala.collection.immutable.List.foreach(List.scala: 392 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointInZK.delete(OffsetCheckpointInZK.scala: 123 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointInZK.clear(OffsetCheckpointInZK.scala: 130 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointInZK.init(OffsetCheckpointInZK.scala: 90 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient$$anonfun$init$ 1. apply(OffsetCheckpointClient.scala: 34 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient$$anonfun$init$ 1. apply(OffsetCheckpointClient.scala: 34 ) at scala.collection.immutable.List.foreach(List.scala: 392 ) at org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient$.init(OffsetCheckpointClient.scala: 34 ) at org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$init$ 1. apply$mcV$sp(StreamingDQApp.scala: 70 ) at org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$init$ 1. apply(StreamingDQApp.scala: 55 ) at org.apache.griffin.measure.launch.streaming.StreamingDQApp$$anonfun$init$ 1. apply(StreamingDQApp.scala: 55 ) at scala.util.Try$.apply(Try.scala: 192 ) at org.apache.griffin.measure.launch.streaming.StreamingDQApp.init(StreamingDQApp.scala: 55 ) at org.apache.griffin.measure.Application$.main(Application.scala: 82 ) at org.apache.griffin.measure.Application.main(Application.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: 62 ) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java: 43 ) at java.lang.reflect.Method.invoke(Method.java: 498 ) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala: 52 ) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala: 849 ) at org.apache.spark.deploy.SparkSubmit.doRunMain$ 1 (SparkSubmit.scala: 167 ) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala: 195 ) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala: 86 ) at org.apache.spark.deploy.SparkSubmit$$anon$ 2. doSubmit(SparkSubmit.scala: 924 ) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala: 933 ) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Copy code

In the griffin.checkpoint item in griffin/measure/target/env.json, hosts should be changed to the IP address of the zk server, xxxx:2181

"griffin.checkpoint" : [ { "type" : "zk" , "config" : { "hosts" : "hadoop101:2181" , "namespace" : "griffin/infocache" , "lock.path" : "lock" , "mode" : "persist" , "init.clear" : true , "close.clear" : false } } ] Copy code

12.java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.elasticsearch.spark.sql.DefaultSource15 not found

This problem is relatively rare. After Google found that the following two blog posts mentioned this problem, explaining the solution from two aspects. But in the final analysis, there is a problem: version incompatibility.

  • github.com/elastic/ela... This is about elasticsearch -hadoop, elasticsearch-spark-20_2.10 and other jars in the configuration version. Because I ran the spark-streaming case, I didn t open es at all. So it's not the problem.
  • There is another way of thinking, www.reddit.com/r/apachespa... which means that spark-streaming-kafka-0-10_2.11:2.4.3 version should be one-to-one correspondence, for example, spark should be above 2.2.x, kafka is 0.10, scala is version 2.11.

Among them, the Spark Streaming integration of Kafka 0.10 is similar in design to the 0.8 Direct Stream method. It provides simple parallelism, 1:1 correspondence between Kafka partitions and Spark partitions, and access to offsets and metadata. However, because the newer integration uses the new Kafka consumer API instead of the simple API, there are significant differences in usage. This version of the integration is marked as experimental, so the API may change. Spark Integration For Kafka has only two versions, 0.8 and 0.10, in the maven repository. Kafka0.8 is a watershed version, and there are many bugs in 0.8 itself, and many patches have been added. For example, the kafka client is 0.9 but the kafka server is 0.8 and needs to be downgraded. There are also problems such as opening the producer and finding that kafka is stuck. Server.properties is added to host.name and other issues, so I won t talk about it here. In short, I can't play around with kafka0.8, and kafka0.11 and above will be incompatible with griffin itself, so I switched to the more robust version of kafka0.10. In addition, use kafka 0.10 instead, the download address is here Scala 2.11-kafka_2.11-0.10.2.2.tgz

A reminder, after setting up Kafka, you still have to run Kafka to see if it can produce and consume data normally (start zk first). If there is a problem, you can consider whether you need to delete the version-2 data directory of zookeeper. By default, zk will store the data. The directory is saved in datadir=/tmp/zookeeper, which varies from person to person. I changed it to zookeeper/zkData/and just delete it.

13. Problem with griffin version

The official griffin document gives some version lists. In addition to the Kafka version that has been clearly explained in the previous question, it is also possible to use 2.4 for spark and 2.7 for hadoop. The main thing to note is that the scala that comes with spark2.4 is 2.11. This is It is possible. But don't use spark3.0 or above, because its built-in scala is 2.12, currently the latest griffin does not support scala2.12, I don't know if it will be supported in the future. In addition, it is recommended to use griffin 0.6.0, griffin 0.5.0 or lower version has not been used for some reason, but it is best not to use the latest version 0.7.0 of github. After compilation, it is found that hdfs sink cannot write files.

After success, you can see the process in the spark background, see the error data in missRecord, and record the result of this trip in _Metrics

14. com.google.gson.JsonIOException: JSON document was not fully consumed.

Exception in thread "main" com.google.gson.JsonIOException: JSON document was not fully consumed. at com.google.gson.Gson.assertFullConsumption(Gson.java: 905 ) at com.google.gson.Gson.fromJson(Gson.java: 898 ) at com.google.gson.Gson.fromJson(Gson.java: 846 ) at com.google.gson.Gson.fromJson(Gson.java: 817 ) com.unionpay.magpie.util.GsonTest.main AT (GsonTest.java: 13 is ) Copy Code

In my case, this error is because the Kafka producer has intermittent data (that is, there is a pause in the middle, which will produce an empty json string ending in "\0"). In this case, Gson refuses to parse it, and it can be parsed through JsonReader. Because jsonReader reads by token by token, and once it reads the final "}" symbol, it will suspend parsing. See the next question for the code. Reference address

15. Caused by: java.lang.NullPointerException

This is json parsed and may be a null pass. Judge this in advance, if it is empty, just return an empty string "". For reference , the code is in the picture above

public static String handleData (String line) { try { if (line!= null && !line.equals( "" )){ Gson gson = new GsonBuilder().setLenient().setDateFormat( "yyyy-MM-dd_HH:mm:ss" ).create(); JsonReader reader = new JsonReader( new StringReader(line)); Student student = gson.fromJson(reader, Student.class); int rand = ra.nextInt( 10 ) + 1 ; if (rand> 8 ) student.setName(student.getName() + "_" + ra.nextInt( 10 )); return gson.toJson(student); } else return "" ; } catch (Exception e){ return "" ; } } Copy code

16.Use JsonReader.setLenient(true) to accept malformed JSON at line 1 column 1 path $

This problem may be the json string (in my case), the value of a certain position is null, you can add a setLenient() below GsonBuilder(), refer to the address

public static String handleData (String line) { try { if (line!= null && !line.equals( "" )){ Gson gson = new GsonBuilder().setLenient().setDateFormat( "yyyy-MM-dd_HH:mm:ss" ).create(); JsonReader reader = new JsonReader( new StringReader(line)); Student student = gson.fromJson(reader, Student.class); int rand = ra.nextInt( 10 ) + 1 ; if (rand> 8 ) student.setName(student.getName() + "_" + ra.nextInt( 10 )); return gson.toJson(student); } else return "" ; } catch (Exception e){ return "" ; } } Copy code

17. com.google.gson.JsonSyntaxException

In my case, it is because the time format is wrong. The solution is to specify the time conversion format code for the gson object when creating the gson object in the previous question, and here