It makes sense to hook in at Serializer and Deserializer level and allow manufacturer and user developers to use the convenient interface given by Kafka. Whereas the new Kafka versions allow Extended Serializers and Extended Deserializers to access headers, we chose to use the schema identifier in the key and value of Kafka data, instead of adding document headers.more info visit:big data and hadoop course
Apache Avro
Apache Avro is a system for serializing data (and calling from a remote procedure). This uses a JSON document to define data structures, called a schema. Most Apache Avro use is through either Generic Record or Specific Record subclasses. The subclasses of the latter are Java classes created from Apache Avro schemas, while the former can be used without prior knowledge of the data structure with which they operated.
If two schemes meet a set of compatibility requirements, data written with one schema (called the writer schema) can be interpreted as if it had been written with the other (called the reader schema). Schemas have a canonical form that has all the information unrelated to serialization, such as descriptions, stripped off to help verify equivalence.
Versioned Schema and Provider Schema in Apache Avro
We need a one-to-one mapping between the schemes and their identifiers, as mentioned earlier. Referencing systems by names is sometimes simpler. When a compatible schema is formed, a next version of the scheme can be called. Thus we can use a tag, version pair to refer to schemas.
Let's call together a VersionedSchema with the schema, its identifier, name and version. This object could possess additional metadata required by the application.
Versioned Schema, public class
Personal int I d final;
Private end name of string;
Personal edition of finale int;
Personal schematic finale;
Public versionedSchema(int I d, string name, field, scheme)
A.id = I d;
Name = Title
This.version = release;
This.schema = sketch;
}
To getName) (public string
Name Return;
}
Public function getVersion)
Launch version;
Public plot getSchema)
Back scheme;
}
Private int obtainId)
ID Return;
}
}
Why this interface is applied will be discussed in a future blog post called "Implementing a Schema Store."
Public get(int I d) VersionedSchema;
Public get(String schemaName, int schemaVersion);
Public versioned diagram getMetadata(schema);
}
Serialisation of Generic Data in Apache Avro
First we need to find out which schema to use when serializing a record. Every record has got a method of gettingSchema. Yet finding out the schema identifier could be time-consuming. Usually defining the schema at initialization time is more effective. This can be achieved by identification directly, or by name and edition. In addition, when producing multiple topics, we may want to set different schemes for different topics and find out the schema from the name of the topic provided as a parameter to the serialize(T, String) process. For our examples this rationale is omitted for the sake of brevity and simplicity.
Private getSchema(T info, String topic)
The schemaProvider.getMetadata(data.getSchema)) (returns;
}
We need to store it in our file, with the schema in hand. Serializing the ID as part of the message gives us a compact solution, because all the magic in the Serializer / Deserializer is happening. It also makes it possible to integrate very quickly with other frameworks and libraries that already support Kafka and allow the user to use their own serializer (such as Spark).
Using this method we write on the first four bytes the schema identifier first.
IOException {Private void writeSchemaId(ByteArrayOutputStream, int I d)
Try (os = new DataOutputStream(stream))
The Int(id) os.write;
}
}
Then we can create a DatumWriter and set the object to serial.
IOException {Private void writeSerializedAvro(ByteArrayOutputStream, T info, schema)
Encoder BinaryEncoder = EncoderFactory.get().binaryEncoder(stream, zero);
DatumWriter = new GenericDatumWriter<>(schema);
DatumWriter.write(Encoder, Data);
.flush) (encoder;
}
To bring it all together, we've implemented a generic serializer for data.
Public class Serializer implements Kafka Apache Avro Serializer
Personal schemaSchemaProvider;
@Surride
Public void configuration(Configure list, boolean isKey)
= SchemaUtils.getSchemaProvider(configs);
}
@Surride
Public byte] [serialize(Topic string, data T)
Seek to (ByteArrayOutputStream = new ByteArrayOutputStream))
Scheme VersionedSchema = getSchema(data, subject);
Id(stream, schema.getId));
WritingSerializedAvro(data source, schema.getSchema));
Return.toByteArray);
} (IOException e)
RuntimeException('Cannot serialize data, 'e);
}
}
IOException {...} Private void writeSchemaId(ByteArrayOutputStream stream, int I d) throws
IOException {...} Private void writeSerializedAvro(ByteArrayOutputStream line, T data, Schema
schema) throws
Private getSchema(T info, string topic) {...}
@Surride
Public close) (void
Check
SchemaProcessor.close);
} (Exception e) {catch
RuntimeException(e) throw new;
}
}
}
Deserialization of Standard Data in Apache Avro
Deserialization can work with a single schema (with which the schema data was written) but you can define a specific schema for readers. The reader scheme has to be consistent with the schema with which the data has been serialized, but need not be identical. We implemented scheme names for this purpose. We can now decide that we want to use a specific version of a schema to read data. We read desired schema versions per schema name at initialization time, and store metadata for quick access in readerSchemasByName. Now we can read any record written with a compatible schema version, as if it were written with the version specified.
@Surride
Public void configuration(Configure list, boolean isKey)
This.schemaProvider = shemaUtils.getSchemaProvider(configs);
= SchemaUtils.getVersionedSchemas(configs, schemaProvider);
}
When a record requires deserialization, we read the writer's scheme identifier first. This allows the reader schema to be looked up by its name. We can create a GeneralDatumReader with both
schemes open, and read the record.
@Surride
Public GenericData. Record deserialize(Topic string, data byte])
Attempt to (ByteArrayInputStream = new ByteArrayInputStream(data))
In schemaId = read(stream);
VersionedSchema = schemaProvider.get(schemaId);
VersionedLeserSchema =
ReaderName(writerSchema.getName));
GenericData. Record = readAvroRecord(stream,
Schema.getSchema), (Schema.getSchema)) (reader;
Rückkehr avroRecord;
} (IOException e)
RuntimeException(e) throw new;
}
}
Private int readSchemaId(IOException) throws
Try(DataInputStream is = DataInputStream new(stream))
The.readInt) (return is;
}
}
About Specific Records in Apache Avro
There is more often than not one class that we would like to use for our records. This class is generated from an Apache Avro scheme then usually. Apache Apache Avro offers tools forgenerating Java code from schemas. One such device is plugin Apache Avro Maven. The generated classes have the schema from which they were created at runtime. That simplifies and makes serialization and deserialization more successful. We can use the class to find out about the schema key to use for serialisation.
@Surride
Public void configuration(Configure list, boolean isKey)
= configs.get(isKey? KEY RECORD CLASSNAME: VALUE RECORD
CLASSNAME).toString);
Try schemaProvider = SchemaUtils.getSchemaProvider(configs))
Class recordClass = Class.forName;
SchemawriterSchema = new system
RecordClass.getClassLoader()).getSchema(recordClass);
= schemaProvider.getMetadata(writerSchema).getId);
} (Exception e) {catch
RuntimeException(e) throw new;
}
}
And we don't need the reasoning to decide the subject and the data schema. For write records, we use the schema available inside the record class.
@Surride
Public T deserialize(Topic string, byte] [data)
Attempt to (ByteArrayInputStream = new ByteArrayInputStream(data))
In schemaId = read(stream);
VersionedSchema = schemaProvider.get(schemaId);
ReadAvroRecord(stream, writeSchema.getSchema), (readerSchema) returns;
} (IOException e)
RuntimeException(e) throw new;
}
}
IOException {Private T readAvroRecord(InputStream stream, Schema writerSchema, Schema
readerSchema)
DatumReader = new SpecificDatumReader<>(writerSchema, readerSchema);
DecoderBinaryDecoder = DecoderFactory.get(.binaryDecoder(stream, null);
Returns datumReader.read(null);
}
Likewise the reader scheme can be extracted from the class itself for deserialization. Deserialization logic is easier, because the reader schema is set at the time of initialization and need not be looked up by the name of the database.
Conclusion
I hope you reach to a conclusion about Apache Avro Deserialization. You can learn more
through big data and hadoop online training
No comments:
Post a Comment