Use Amazon Kinesis Client Library for Java in the Java Class Library for data stream management

Use Amazon Kinesis Client Library for Java in the Java Class Library for data stream management Amazon Kinesis is a service for processing streaming data on the Amazon cloud platform.Amazon Kinesis Client Library for Java is a Java library for developers to simplify developers using Amazon Kinesis services.It provides developers with a series of functions that can be used to manage and process data streams. Before starting to use Amazon Kinesis Client Library for Java, you need to configure and create Amazon Kinesis data streams.Once the data stream is created successfully, this type of library can be used to process and manage the data in this data stream. Here are some common examples of using Amazon Kinesis Client Library for Java: 1. Get the data in the data stream import software.amazon.kinesis.exceptions.InvalidArgumentException; import software.amazon.kinesis.exceptions.KinesisClientLibException; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.producer.KinesisProducer; import software.amazon.kinesis.producer.KinesisProducerConfiguration; import software.amazon.kinesis.producer.UserRecordFailedException; import software.amazon.kinesis.producer.UserRecordResult; import software.amazon.kinesis.retrieval.FirehoseRecord; import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.KinesisClientRecordProcessor; import software.amazon.kinesis.retrieval.KinesisClientRecordProcessorFactory; import software.amazon.kinesis.retrieval.KinesisClientRecordProcessorFactoryConfig; import software.amazon.kinesis.retrieval.KinesisClientRecordProcessorConfiguration; import software.amazon.kinesis.retrieval.KinesisClientRecordProcessorConfigur software.amazon.kinesis.samples.sample2.Configuration; import software.amazon.kinesis.samples.sample2.Consts; import software.amazon.kinesis.samples.sample2.utils.StreamsUtils; import software.amazon.kinesis.samples.sample2.utils.StringUtils; import software.amazon.kinesis.samples.utils.KinesisSampleUtils; import software.amazon.kinesis.samples.utils.LeaseCoordinatorWithTime; import software.amazon.kinesis.samples.utils.ProcessingUtils; import software.amazon.kinesis.samples.utils.SamplesConfiguration; import software.amazon.kinesis.samples.utils.SamplesUtils; import software.amazon.kinesis.samples.utils.StreamsUtils; import software.amazon.kinesis.samples.utils.StringUtils; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; import java.util.Random; public class DataStreamProcessor implements KinesisClientRecordProcessor { @Override public void initialize(String shardId) { // Initialize processing procedures System.out.println("Initialized. ShardId: " + shardId); } @Override public void processRecords(List<KinesisClientRecord> records, Checkpointer checkpointer) { // Treatment each record for (KinesisClientRecord record : records) { // Get the partition key and data of the record byte[] data = record.getData().array(); String partitionKey = record.getPartitionKey(); String dataString = new String(data, StandardCharsets.UTF_8); System.out.println("Processing record. PartitionKey: " + partitionKey + ", Data: " + dataString); // Perform specific processing logic // ... } // Check the point to confirm the processing of the current record try { checkpointer.checkpoint(); } catch (KinesisClientLibException e) { System.err.println("Error while trying to checkpoint in record processor for shard " + shardId + ": " + e); } } @Override public void leaseLost() { // Processing the loss of leases } @Override public void shardEnded(String checkpointerSequenceNumber, Checkpointer checkpointer) { // Treatment of the end of the shard try { checkpointer.checkpoint(checkpointerSequenceNumber); } catch (KinesisClientLibException e) { System.err.println("Error while trying to checkpoint after shard ended. Error: " + e); } } @Override public void shutdownRequested(Checkpointer checkpointer) { // Process close request try { checkpointer.checkpoint(); } catch (KinesisClientLibException e) { System.err.println("Error while trying to checkpoint before shutdown. Error: " + e); } } public static void main(String[] args) throws IOException { // Load the configuration parameter from the configuration file String configFile = "<config_file_path>"; String streamName = "<stream_name>"; byte[] fileBytes = Files.readAllBytes(Paths.get(configFile)); String configFileContent = new String(fileBytes, StandardCharsets.UTF_8); Configuration configuration = Configuration.fromJson(configFileContent); // Factory category of Amazon Kinesis Client Library KinesisClientRecordProcessorFactoryConfig factoryConfig = new KinesisClientRecordProcessorFactoryConfig(); factoryConfig.config = configuration; factoryConfig.streamName = streamName; factoryConfig.provider = null; KinesisClientRecordProcessorFactory kclFactory = new KinesisClientRecordProcessorFactory(factoryConfig); // Create a data stream processor and start it KinesisSampleUtils.createOrAttachToStream(configuration, streamName, "worker-id", kclFactory); } } 2. Create a data stream processor and start import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder; import software.amazon.kinesis.checkpoint.Checkpointer; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinatorConfig; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseTaker; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.lifecycle.ShutdownReason; import software.amazon.kinesis.processor.RecordProcessorCheckpointer; import software.amazon.kinesis.processor.StreamshardRecordProcessor; import software.amazon.kinesis.processor.StreamshardRecordProcessorConfig; import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.KinesisDataFetcher; import software.amazon.kinesis.retrieval.RetrievalConfig; import software.amazon.kinesis.samples.flink.KinesisConfigConstants; import software.amazon.kinesis.samples.flink.KinesisDataFetcherThread; import software.amazon.kinesis.samples.flink.RecordPublisher; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.util.Collections; import java.util.List; import java.util.UUID; public class DataStreamProcessor { private static final Log LOG = LogFactory.getLog(DataStreamProcessor.class); public static void main(String[] args) { // Deared Amazon Kinesis Client Library for Java final String streamName = "<stream_name>"; final String region = "<aws_region>"; final String workerId = UUID.randomUUID().toString(); final int numRetries = 10; final long backoffTimeInMillis = 1500L; final long failoverTimeInMillis = 30000L; final AwsCredentialsProvider credentialsProvider = new DefaultCredentialsProvider(); // Configure Amazon Kinesis data obtain component KinesisDataFetcherConfig kinesisDataFetcherConfig = new KinesisDataFetcherConfig(); kinesisDataFetcherConfig.regionName = region; kinesisDataFetcherConfig.streamName = streamName; kinesisDataFetcherConfig.kinesis = KinesisAsyncClient.builder().region(region).build(); kinesisDataFetcherConfig.recordPublisher = new RecordPublisher(); final KinesisDataFetcher kinesisDataFetcher = new KinesisDataFetcher(kinesisDataFetcherConfig); // Configure lease coordination components final LeaseRefresher.LeaseRefresherFactory leaseRefresherFactory = new LeaseRefresher.LeaseRefresherFactory() { @Override public LeaseRefresher create( final LeaseCoordinatorConfig leaseCoordinatorConfig, final StreamConsumerConfig streamConfig) { return new KinesisClientLibLeaseRefresher(kinesisDataFetcherConfig.kinesis, leaseCoordinatorConfig, streamConfig); } }; final LeaseCoordinator.LeaseCoordinatorConfig leaseCoordinatorConfig = new LeaseCoordinator.LeaseCoordinatorConfig( streamName, kinesisDataFetcherConfig.kinesis.describeStream(streamName).get().streamDescription(), workerId, credentialsProvider, numRetries, backoffTimeInMillis, failoverTimeInMillis); final StreamshardRecordProcessor.StreamshardRecordProcessorFactory mkRecordProcessorFactory = new StreamshardRecordProcessor.StreamshardRecordProcessorFactory() { @Override public StreamshardRecordProcessor createProcessor() { return new DataStreamProcessor(); } }; final LeaseTaker.LeaseTakerConfig leaseTakerConfig = new LeaseTaker.LeaseTakerConfig( streamName, kinesisDataFetcherConfig.kinesis.describeStream(streamName).get().streamDescription(), workerId, credentialsProvider); final ShardDetector.ShardDetectorConfig shardDetectorConfig = new ShardDetector.ShardDetectorConfig( streamName, kinesisDataFetcherConfig.kinesis.describeStream(streamName).get().streamDescription(), numRetries, credentialsProvider, backoffTimeInMillis); final StreamingConfigBuilder streamingConfigBuilder = new StreamingConfigBuilder(); final LifecycleConfig lifecycleConfig = new LifecycleConfig(new RecordProcessorCheckpointer() { @Override public void checkpoint(RecordProcessorCheckpointer.CheckpointOption checkpointOption) { // Submit the checkpoint of the recorder to Amazon Kinesis try { synchronized (kinesisDataFetcher) { kinesisDataFetcher.checkpoint(); } } catch (Exception e) { LOG.warn("Caught exception when checkpointing with KinesisAsyncClient", e); throw e; } } @Override public void checkpoint(long sequenceNumber) { // Do not support the way to submit serial numbers in a separate checkpoint throw new UnsupportedOperationException(); } }, streamingConfigBuilder); final KinesisClientLibLeaseCoordinator leaseCoordinator = new KinesisClientLibLeaseCoordinator(leaseCoordinatorConfig, kinesisDataFetcherConfig.kinesis, leaseTakerConfig, shardDetectorConfig, leaseRefresherFactory, lifecycleConfig, mkRecordProcessorFactory); // Start the data stream processor try { leaseCoordinator.initialize(); leaseCoordinator.run(); } catch (KinesisClientLibException e) { LOG.error("Caught exception while initializing LeaseCoordinator", e); } } // Implement the StreamShardrecordProcessor interface, and customize the processing logic public void processRecords(KinesisClientRecord inputRecords) { for (KinesisClientRecord record : inputRecords.getRecords()) { String data = new String(record.getData().array()); System.out.println("Processing record: " + data); // Treatment logic // ... } } // Other interface methods // ... } Through the above example code, we can see how to use Amazon Kinesis Client Library for Java to process and manage data streams.In the first example, we implemented the KinesisClientRecordProcessor interface, including initialization, processing records, lease loss, sliced ending and closing requests.In the second example, we demonstrated how to configure and start the data stream processor, including Kinesis data obtaining components and lease coordination components, and achieved custom recording processing logic. In summary, Amazon Kinesis Client Library for Java is a powerful tool to simplify the data flow management of developers when using Amazon Kinesis services.Whether it is processing data, management leases, or startup processors, using this type of library can greatly simplify these operations, so that we can focus more on processing streaming data itself.