use
5.010;
our
$VERSION
=
'1.08'
;
import
)
;
our
@EXPORT_OK
=
qw(
decode_fetch_request
decode_metadata_request
decode_offset_request
decode_produce_request
decode_saslhandshake_request
encode_fetch_response
encode_metadata_response
encode_offset_response
encode_produce_response
encode_saslhandshake_response
)
;
_ARRAY0
_HASH
_POSINT
_SCALAR
_STRING
)
;
dualvar
)
;
isint
)
;
%ERROR
$COMPRESSION_NONE
$ERROR_MISMATCH_ARGUMENT
$ERROR_NO_ERROR
$ERROR_NOT_BINARY_STRING
$ERROR_REQUEST_OR_RESPONSE
)
;
$APIKEY_FETCH
$APIKEY_METADATA
$APIKEY_OFFSET
$APIKEY_PRODUCE
)
;
$DEFAULT_APIVERSION
$CONSUMERS_REPLICAID
$NULL_BYTES_LENGTH
$_int64_template
_encode_MessageSet_array
_encode_string
_decode_MessageSet_array
_decode_MessageSet_template
_pack64
_unpack64
)
;
_is_suitable_int
)
;
my
(
$_Response_header_template
,
$_Response_header_length
) = (
q{l>l>l>}
,
8
);
my
(
$_ProduceResponse_body_template
,
$_ProduceResponse_body_length
) = (
q{l>s>a[8]}
,
14
);
my
(
$_FetchResponse_body_template
,
$_FetchResponse_body_length
) = (
q{l>s>a[8]}
,
14
);
my
(
$_saslhandshake_template
,
$_saslhandshake_template_length
) = (
q{x[l]l>s>l>}
,
10
);
my
(
$_OffsetResponse_body_template
,
$_OffsetResponse_body_length
) = (
q{l>s>l>}
,
10
);
my
(
$_MetadataResponse_Broker_body_template
,
$_MetadataResponse_Broker_body_length
) = (
q{l>s>a*l>}
,
10
);
my
(
$_MetadataResponse_PartitionMetadata_body_template
,
$_MetadataResponse_PartitionMetadata_body_length
) = (
q{s>l>l>l>}
,
14
);
my
(
$_SaslHandshakeRequest_header_template
,
$_SaslHandshakeRequest_header_length
) = (
q{l>s>s>l>s>s>}
,
16
);
my
(
$_ProduceRequest_header_template
,
$_ProduceRequest_header_length
) = (
q{x[l]s>s>l>s>/as>l>l>}
,
12
);
my
$_ProduceRequest_topic_body_template
=
q{s>/al>l>}
;
my
$_package_error
;
sub
encode_saslhandshake_response {
my
(
$saslhandshake_response
) =
@_
;
my
$len
=
$_saslhandshake_template_length
;
$len
+=
$_
for
map
{2+
length
(
$_
)} @{
$saslhandshake_response
->{Mechanisms}};
my
$tmpl
=
$_saslhandshake_template
.
join
''
,
map
{
's>/a'
} @{
$saslhandshake_response
->{Mechanisms}};
return
pack
(
'l>'
.
$tmpl
,
$len
, 0,
$saslhandshake_response
->{CorrelationId},
$saslhandshake_response
->{ErrorCode}, 0,
scalar
(@{
$saslhandshake_response
->{Mechanisms}}), @{
$saslhandshake_response
->{Mechanisms}});
}
sub
decode_saslhandshake_request {
my
(
$bin_stream_ref
) =
@_
;
_is_bin_stream_correct(
$bin_stream_ref
)
or
return
_protocol_error(
$ERROR_MISMATCH_ARGUMENT
);
my
$resp
= {};
(
$resp
->{Len},
$resp
->{ApiKey},
$resp
->{ApiVersion},
$resp
->{CorrelationId},
$resp
->{ClientId},
$resp
->{MechanismLen}) =
unpack
$_SaslHandshakeRequest_header_template
,
$$bin_stream_ref
;
$resp
->{Mechanism} =
substr
(
$$bin_stream_ref
,
$_SaslHandshakeRequest_header_length
,
$resp
->{MechanismLen});
return
$resp
;
}
sub
decode_produce_request {
my
(
$bin_stream_ref
) =
@_
;
_is_bin_stream_correct(
$bin_stream_ref
)
or
return
_protocol_error(
$ERROR_MISMATCH_ARGUMENT
);
my
@data
;
my
$request
= {
bin_stream
=>
$bin_stream_ref
,
data
=> \
@data
,
};
_decode_produce_request_template(
$request
);
@data
=
unpack
(
$request
->{template},
$$bin_stream_ref
);
my
(
$i
,
$Produce_Request
) = ( 0, {} );
$APIKEY_PRODUCE
==
$data
[
$i
++ ]
or
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'ApiKey'
);
$DEFAULT_APIVERSION
==
$data
[
$i
++ ]
or
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'ApiVersion'
);
$Produce_Request
= {
CorrelationId
=>
$data
[
$i
++ ],
ClientId
=>
$data
[
$i
++ ],
RequiredAcks
=>
$data
[
$i
++ ],
Timeout
=>
$data
[
$i
++ ],
};
my
$topics_array
=
$Produce_Request
->{topics} = [];
my
$topics_array_size
=
$data
[
$i
++ ];
while
(
$topics_array_size
-- ) {
my
$topic
= {
TopicName
=>
$data
[
$i
++ ],
};
my
$partitions_array
=
$topic
->{partitions} = [];
my
$partitions_array_size
=
$data
[
$i
++ ];
while
(
$partitions_array_size
-- ) {
my
$partition
= {
Partition
=>
$data
[
$i
++ ],
};
my
$MessageSetSize
=
$data
[
$i
++ ];
my
$MessageSet_array
=
$partition
->{MessageSet} = [];
_decode_MessageSet_array(
$request
,
$MessageSetSize
, \
$i
,
$MessageSet_array
);
push
(
@$partitions_array
,
$partition
);
}
push
(
@$topics_array
,
$topic
);
}
return
$Produce_Request
;
}
sub
encode_produce_response {
my
(
$Produce_Response
) =
@_
;
_HASH(
$Produce_Response
)
or
return
_protocol_error(
$ERROR_MISMATCH_ARGUMENT
);
my
@data
= ();
my
$response
= {
data
=> \
@data
,
};
(
defined
(
$Produce_Response
->{CorrelationId} ) && isint(
$Produce_Response
->{CorrelationId} ) )
?
push
(
@data
,
$Produce_Response
->{CorrelationId} )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'CorrelationId'
);
$response
->{template} =
$_Response_header_template
;
$response
->{len} =
$_Response_header_length
;
my
$topics_array
=
$Produce_Response
->{topics};
_ARRAY0(
$topics_array
)
?
push
(
@data
,
scalar
(
@$topics_array
) )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'topics'
);
foreach
my
$topic
(
@$topics_array
) {
$response
->{template} .=
q{s>}
;
$response
->{len} += 2;
_encode_string(
$response
,
$topic
->{TopicName} );
my
$partitions_array
=
$topic
->{partitions};
_ARRAY0(
$partitions_array
)
?
push
(
@data
,
scalar
(
@$partitions_array
) )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'partitions'
);
$response
->{template} .=
q{l>}
;
$response
->{len} += 4;
foreach
my
$partition
(
@$partitions_array
) {
(
defined
(
$partition
->{Partition} ) && isint(
$partition
->{Partition} ) )
?
push
(
@data
,
$partition
->{Partition} )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'Partition'
);
(
defined
(
$partition
->{ErrorCode} ) && isint(
$partition
->{ErrorCode} ) )
?
push
(
@data
,
$partition
->{ErrorCode} )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'ErrorCode'
);
_is_suitable_int(
$partition
->{Offset} )
?
push
(
@data
, _pack64(
$partition
->{Offset} ) )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'Offset'
);
$response
->{template} .=
$_ProduceResponse_body_template
;
$response
->{len} +=
$_ProduceResponse_body_length
;
}
}
return
pack
(
$response
->{template},
$response
->{len},
@data
);
}
my
$_decode_fetch_request_template
=
qq{x[l]s>s>l>s>/al>l>l>l>X[l]l>/(s>/al>X[l]l>/(l>${_int64_template}
l>))};
sub
decode_fetch_request {
my
(
$bin_stream_ref
) =
@_
;
_is_bin_stream_correct(
$bin_stream_ref
)
or
return
_protocol_error(
$ERROR_MISMATCH_ARGUMENT
);
my
@data
=
unpack
(
$_decode_fetch_request_template
,
$$bin_stream_ref
);
my
(
$i
,
$Fetch_Request
) = ( 0, {} );
$APIKEY_FETCH
==
$data
[
$i
++ ]
or
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'ApiKey'
);
$DEFAULT_APIVERSION
==
$data
[
$i
++ ]
or
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'ApiVersion'
);
$Fetch_Request
->{CorrelationId} =
$data
[
$i
++ ];
$Fetch_Request
->{ClientId} =
$data
[
$i
++ ];
$CONSUMERS_REPLICAID
==
$data
[
$i
++ ]
or
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'ReplicaId'
);
$Fetch_Request
->{MaxWaitTime} =
$data
[
$i
++ ];
$Fetch_Request
->{MinBytes} =
$data
[
$i
++ ];
my
$topics_array
=
$Fetch_Request
->{topics} = [];
my
$topics_array_size
=
$data
[
$i
++ ];
while
(
$topics_array_size
-- ) {
my
$topic
= {
TopicName
=>
$data
[
$i
++ ],
};
my
$partitions_array
=
$topic
->{partitions} = [];
my
$partitions_array_size
=
$data
[
$i
++ ];
while
(
$partitions_array_size
-- ) {
my
$partition
= {
Partition
=>
$data
[
$i
++ ],
FetchOffset
=> _unpack64(
$data
[
$i
++ ] ),
MaxBytes
=>
$data
[
$i
++ ],
};
push
(
@$partitions_array
,
$partition
);
}
push
(
@$topics_array
,
$topic
);
}
return
$Fetch_Request
;
}
sub
encode_fetch_response {
my
(
$Fetch_Response
) =
@_
;
_HASH(
$Fetch_Response
)
or
return
_protocol_error(
$ERROR_MISMATCH_ARGUMENT
);
my
@data
= ();
my
$response
= {
data
=> \
@data
,
};
(
defined
(
$Fetch_Response
->{CorrelationId} ) && isint(
$Fetch_Response
->{CorrelationId} ) )
?
push
(
@data
,
$Fetch_Response
->{CorrelationId} )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'CorrelationId'
);
$response
->{template} =
$_Response_header_template
;
$response
->{len} =
$_Response_header_length
;
my
$topics_array
=
$Fetch_Response
->{topics} // [];
_ARRAY0(
$topics_array
)
?
push
(
@data
,
scalar
(
@$topics_array
) )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'topics'
);
foreach
my
$topic
(
@$topics_array
) {
$response
->{template} .=
q{s>}
;
$response
->{len} += 2;
_encode_string(
$response
,
$topic
->{TopicName} );
my
$partitions_array
=
$topic
->{partitions} // [];
_ARRAY0(
$partitions_array
)
?
push
(
@data
,
scalar
(
@$partitions_array
) )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'partitions'
);
$response
->{template} .=
q{l>}
;
$response
->{len} += 4;
foreach
my
$partition
(
@$partitions_array
) {
(
defined
(
$partition
->{Partition} ) && isint(
$partition
->{Partition} ) )
?
push
(
@data
,
$partition
->{Partition} )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'Partition'
);
(
defined
(
$partition
->{ErrorCode} ) && isint(
$partition
->{ErrorCode} ) )
?
push
(
@data
,
$partition
->{ErrorCode} )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'ErrorCode'
);
_is_suitable_int(
$partition
->{HighwaterMarkOffset} )
?
push
(
@data
, _pack64(
$partition
->{HighwaterMarkOffset} ) )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'HighwaterMarkOffset'
);
$response
->{template} .=
$_FetchResponse_body_template
;
$response
->{len} +=
$_FetchResponse_body_length
;
_encode_MessageSet_array(
$response
,
$partition
->{MessageSet} );
}
}
return
pack
(
$response
->{template},
$response
->{len},
@data
);
}
my
$_decode_offset_request_template
=
qq{x[l]s>s>l>s>/al>l>X[l]l>/(s>/al>X[l]l>/(l>${_int64_template}
l>))};
sub
decode_offset_request {
my
(
$bin_stream_ref
) =
@_
;
_is_bin_stream_correct(
$bin_stream_ref
)
or
return
_protocol_error(
$ERROR_MISMATCH_ARGUMENT
);
my
@data
=
unpack
(
$_decode_offset_request_template
,
$$bin_stream_ref
);
my
(
$i
,
$Offset_Request
) = ( 0, {} );
$APIKEY_OFFSET
==
$data
[
$i
++ ]
or
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'ApiKey'
);
$DEFAULT_APIVERSION
==
$data
[
$i
++ ]
or
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'ApiVersion'
);
$Offset_Request
->{CorrelationId} =
$data
[
$i
++ ];
$Offset_Request
->{ClientId} =
$data
[
$i
++ ];
$CONSUMERS_REPLICAID
==
$data
[
$i
++ ]
or
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'ReplicaId'
);
my
$topics_array
=
$Offset_Request
->{topics} = [];
my
$topics_array_size
=
$data
[
$i
++ ];
while
(
$topics_array_size
-- ) {
my
$topic
= {
TopicName
=>
$data
[
$i
++ ],
};
my
$partitions_array
=
$topic
->{partitions} = [];
my
$partitions_array_size
=
$data
[
$i
++ ];
while
(
$partitions_array_size
-- ) {
my
$partition
= {
Partition
=>
$data
[
$i
++ ],
Time
=> _unpack64(
$data
[
$i
++ ] ),
MaxNumberOfOffsets
=>
$data
[
$i
++ ],
};
push
@$partitions_array
,
$partition
;
}
push
@$topics_array
,
$topic
;
}
return
$Offset_Request
;
}
sub
encode_offset_response {
my
(
$Offset_Response
) =
@_
;
_HASH(
$Offset_Response
)
or
return
_protocol_error(
$ERROR_MISMATCH_ARGUMENT
);
my
@data
;
my
$response
= {
data
=> \
@data
,
};
(
defined
(
$Offset_Response
->{CorrelationId} ) && isint(
$Offset_Response
->{CorrelationId} ) )
?
push
(
@data
,
$Offset_Response
->{CorrelationId} )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'CorrelationId'
);
$response
->{template} =
$_Response_header_template
;
$response
->{len} =
$_Response_header_length
;
my
$topics_array
=
$Offset_Response
->{topics} // [];
_ARRAY0(
$topics_array
)
?
push
(
@data
,
scalar
(
@$topics_array
) )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'topics'
);
foreach
my
$topic
(
@$topics_array
) {
$response
->{template} .=
q{s>}
;
$response
->{len} += 2;
_encode_string(
$response
,
$topic
->{TopicName} );
my
$PartitionOffsets_array
=
$topic
->{PartitionOffsets} // [];
_ARRAY0(
$PartitionOffsets_array
)
?
push
(
@data
,
scalar
(
@$PartitionOffsets_array
) )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'PartitionOffsets'
);
$response
->{template} .=
q{l>}
;
$response
->{len} += 4;
foreach
my
$PartitionOffsets
(
@$PartitionOffsets_array
) {
(
defined
(
$PartitionOffsets
->{Partition} ) && isint(
$PartitionOffsets
->{Partition} ) )
?
push
(
@data
,
$PartitionOffsets
->{Partition} )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'Partition'
);
(
defined
(
$PartitionOffsets
->{ErrorCode} ) && isint(
$PartitionOffsets
->{ErrorCode} ) )
?
push
(
@data
,
$PartitionOffsets
->{ErrorCode} )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'ErrorCode'
);
my
$Offset_array
=
$PartitionOffsets
->{Offset} // [];
_ARRAY0(
$Offset_array
)
?
push
(
@data
,
scalar
(
@$Offset_array
) )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'Offset array'
);
$response
->{template} .=
$_OffsetResponse_body_template
;
$response
->{len} +=
$_OffsetResponse_body_length
;
foreach
my
$Offset
(
@$Offset_array
) {
_is_suitable_int(
$Offset
)
?
push
(
@data
, _pack64(
$Offset
) )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'Offset'
);
$response
->{template} .=
q{a[8]}
;
$response
->{len} += 8;
}
}
}
return
pack
(
$response
->{template},
$response
->{len},
@data
);
}
my
$_decode_metadata_request_template
=
q{x[l]s>s>l>s>/al>X[l]l>/(s>/a)}
;
sub
decode_metadata_request {
my
(
$bin_stream_ref
) =
@_
;
_is_bin_stream_correct(
$bin_stream_ref
)
or
return
_protocol_error(
$ERROR_MISMATCH_ARGUMENT
);
my
@data
=
unpack
(
$_decode_metadata_request_template
,
$$bin_stream_ref
);
my
(
$i
,
$Metadata_Request
) = ( 0, {} );
$APIKEY_METADATA
==
$data
[
$i
++ ]
or
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'ApiKey'
);
$DEFAULT_APIVERSION
==
$data
[
$i
++ ]
or
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'ApiVersion'
);
$Metadata_Request
->{CorrelationId} =
$data
[
$i
++ ];
$Metadata_Request
->{ClientId} =
$data
[
$i
++ ];
my
$topics_array
=
$Metadata_Request
->{topics} = [];
my
$topics_array_size
=
$data
[
$i
++ ];
while
(
$topics_array_size
-- ) {
push
(
@$topics_array
,
$data
[
$i
++ ] );
}
return
$Metadata_Request
;
}
sub
encode_metadata_response {
my
(
$Metadata_Response
) =
@_
;
_HASH(
$Metadata_Response
)
or
return
_protocol_error(
$ERROR_MISMATCH_ARGUMENT
);
my
@data
;
my
$response
= {
data
=> \
@data
,
};
(
defined
(
$Metadata_Response
->{CorrelationId} ) && isint(
$Metadata_Response
->{CorrelationId} ) )
?
push
(
@data
,
$Metadata_Response
->{CorrelationId} )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'CorrelationId'
);
$response
->{template} =
$_Response_header_template
;
$response
->{len} =
$_Response_header_length
;
my
$Broker_array
=
$Metadata_Response
->{Broker} // [];
_ARRAY0(
$Broker_array
)
?
push
(
@data
,
scalar
(
@$Broker_array
) )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'Broker'
);
foreach
my
$Broker
(
@$Broker_array
) {
(
defined
(
$Broker
->{NodeId} ) && isint(
$Broker
->{NodeId} ) )
?
push
(
@data
,
$Broker
->{NodeId} )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'NodeId'
);
defined
(
my
$Host
=
$Broker
->{Host} )
or
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'Host'
);
_verify_string(
$Host
,
'Host'
)
or
return
;
my
$Host_length
=
length
$Broker
->{Host};
_STRING(
$Broker
->{Host} )
?
push
(
@data
,
$Host_length
,
$Broker
->{Host} )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'Host'
);
$response
->{len} +=
$Host_length
;
_POSINT(
$Broker
->{Port} )
?
push
(
@data
,
$Broker
->{Port} )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'Port'
);
$response
->{template} .=
$_MetadataResponse_Broker_body_template
;
$response
->{len} +=
$_MetadataResponse_Broker_body_length
;
}
my
$TopicMetadata_array
=
$Metadata_Response
->{TopicMetadata} // [];
_ARRAY0(
$TopicMetadata_array
)
?
push
(
@data
,
scalar
(
@$TopicMetadata_array
) )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'TopicMetadata'
);
$response
->{template} .=
q{l>}
;
$response
->{len} += 4;
foreach
my
$TopicMetadata
(
@$TopicMetadata_array
) {
(
defined
(
$TopicMetadata
->{ErrorCode} ) && isint(
$TopicMetadata
->{ErrorCode} ) )
?
push
(
@data
,
$TopicMetadata
->{ErrorCode} )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'ErrorCode'
);
$response
->{template} .=
q{s>s>}
;
$response
->{len} += 4;
_encode_string(
$response
,
$TopicMetadata
->{TopicName} );
my
$PartitionMetadata_array
=
$TopicMetadata
->{PartitionMetadata} // [];
_ARRAY0(
$PartitionMetadata_array
)
?
push
(
@data
,
scalar
(
@$PartitionMetadata_array
) )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'PartitionMetadata'
);
$response
->{template} .=
q{l>}
;
$response
->{len} += 4;
foreach
my
$PartitionMetadata
(
@$PartitionMetadata_array
) {
(
defined
(
$PartitionMetadata
->{ErrorCode} ) && isint(
$PartitionMetadata
->{ErrorCode} ) )
?
push
(
@data
,
$PartitionMetadata
->{ErrorCode} )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'ErrorCode'
);
(
defined
(
$PartitionMetadata
->{Partition} ) && isint(
$PartitionMetadata
->{Partition} ) )
?
push
(
@data
,
$PartitionMetadata
->{Partition} )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'Partition'
);
(
defined
(
$PartitionMetadata
->{Leader} ) && isint(
$PartitionMetadata
->{Leader} ) )
?
push
(
@data
,
$PartitionMetadata
->{Leader} )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'Leader'
);
my
$Replicas_array
=
$PartitionMetadata
->{Replicas} // [];
_ARRAY0(
$Replicas_array
)
?
push
(
@data
,
scalar
(
@$Replicas_array
) )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'Replicas'
);
$response
->{template} .=
$_MetadataResponse_PartitionMetadata_body_template
;
$response
->{len} +=
$_MetadataResponse_PartitionMetadata_body_length
;
foreach
my
$Replica
(
@$Replicas_array
) {
(
defined
(
$Replica
) && isint(
$Replica
) )
?
push
(
@data
,
$Replica
)
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'Replicas ReplicaId'
);
$response
->{template} .=
q{l>}
;
$response
->{len} += 4;
}
my
$Isr_array
=
$PartitionMetadata
->{Isr} // [];
_ARRAY0(
$Isr_array
)
?
push
(
@data
,
scalar
(
@$Isr_array
) )
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'Isr'
);
$response
->{template} .=
q{l>}
;
$response
->{len} += 4;
foreach
my
$Isr
(
@$Isr_array
) {
(
defined
(
$Isr
) && isint(
$Isr
) )
?
push
(
@data
,
$Isr
)
:
return
_protocol_error(
$ERROR_REQUEST_OR_RESPONSE
,
'Isr ReplicaId'
);
$response
->{template} .=
q{l>}
;
$response
->{len} += 4;
}
}
}
return
pack
(
$response
->{template},
$response
->{len},
@data
);
}
sub
last_errorcode {
return
( (
$_package_error
// 0 ) + 0 );
}
sub
last_error {
return
( (
$_package_error
//
q{}
).
q{}
);
}
sub
_decode_produce_request_template {
my
(
$request
) =
@_
;
my
(
$ClientId_length
,
$topics_array_size
,
$TopicName_length
,
$partitions_array_size
,
);
$request
->{template} =
$_ProduceRequest_header_template
;
$request
->{stream_offset} =
$_ProduceRequest_header_length
;
$ClientId_length
=
unpack
(
q{x[}
.
$request
->{stream_offset}
.
q{]s>}
,
${
$request
->{bin_stream} }
);
$request
->{stream_offset} += 8
+
$ClientId_length
;
$topics_array_size
=
unpack
(
q{x[}
.
$request
->{stream_offset}
.
q{]l>}
,
${
$request
->{bin_stream} }
);
$request
->{stream_offset} += 4;
while
(
$topics_array_size
-- ) {
$request
->{template} .=
$_ProduceRequest_topic_body_template
;
$TopicName_length
=
unpack
(
q{x[}
.
$request
->{stream_offset}
.
q{]s>}
,
${
$request
->{bin_stream} }
);
$request
->{stream_offset} +=
2
+
$TopicName_length
;
$partitions_array_size
=
unpack
(
q{x[}
.
$request
->{stream_offset}
.
q{]l>}
,
${
$request
->{bin_stream} }
);
$request
->{stream_offset} += 8;
_decode_MessageSet_template(
$request
);
}
return
;
}
sub
_is_bin_stream_correct {
my
(
$bin_stream_ref
) =
@_
;
return
_SCALAR(
$bin_stream_ref
) && _STRING(
$$bin_stream_ref
);
}
sub
_verify_string {
my
(
$string
,
$description
) =
@_
;
return
1
if
$string
eq
q{}
;
_STRING(
$string
)
//
return
_protocol_error(
$ERROR_MISMATCH_ARGUMENT
,
$description
);
utf8::is_utf8(
$string
)
and
return
_protocol_error(
$ERROR_NOT_BINARY_STRING
,
$description
);
return
1;
}
sub
_protocol_error {
my
(
$error_code
,
$description
) =
@_
;
$_package_error
= dualvar
$error_code
,
$ERROR
{
$error_code
}.(
$description
?
': '
.
$description
:
q{}
);
return
;
}
1;