/ / Existe uma API do cliente JSON REST orientada a eventos para Java? - java, json, resto

Existe uma API do cliente JSON REST orientada a eventos para Java? - java, json, resto

Eu tenho um aplicativo Java que usa a API RestTemplate do Spring para escrever consumidores concisos e legíveis dos serviços JSON REST:

Em essência:

 RestTemplate rest = new RestTemplate(clientHttpRequestFactory);
ResponseEntity<ItemList> response = rest.exchange(url,
HttpMethod.GET,
requestEntity,
ItemList.class);

for(Item item : response.getBody().getItems()) {
handler.onItem(item);
}

A resposta JSON contém uma lista de itens e, como você pode ver, eu tenho um design orientado a eventos no meu próprio código para lidar com cada item por vez. No entanto, a lista inteira está na memória como parte de response, qual RestTemplate.exchange() produz.

Eu gostaria que o aplicativo fosse capaz de lidar com respostas que contenham um grande número de itens - digamos 50.000, e nesse caso, há dois problemas com a implementação como está:

  1. Nenhum item é tratado até que toda a resposta HTTP seja transferida - adicionando latência indesejada.
  2. O grande objeto de resposta fica na memória e não pode ser "GC" d até que o último item tenha sido tratado.

Existe uma API do cliente Java JSON / REST razoavelmente madura por aí que consome respostas de maneira orientada a eventos?

Eu imagino que isso permitiria que você fizesse algo como:

 RestStreamer rest = new RestStreamer(clientHttpRequestFactory);

// Tell the RestStreamer "when, while parsing a response, you encounter a JSON
// element matching JSONPath "$.items[*]" pass it to "handler" for processing.
rest.onJsonPath("$.items[*]").handle(handler);

// Tell the RestStreamer to make an HTTP request, parse it as a stream.
// We expect "handler" to get passed an object each time the parser encounters
// an item.
rest.execute(url, HttpMethod.GET, requestEntity);

Agradeço por poder lançar minha própria implementaçãodesse comportamento com o fluxo de APIs JSON de Jackson, GSON etc. - mas eu gostaria de saber que havia algo lá fora que o faz de maneira confiável com uma API concisa e expressiva, integrada ao aspecto HTTP.

Respostas:

2 para resposta № 1

podes tentar JsonSurfer que é projetado para processar o json stream no estilo orientado a eventos.

JsonSurfer surfer = JsonSurfer.jackson();
Builder builder = config();
builder.bind("$.items[*]", new JsonPathListener() {
@Override
public void onValue(Object value, ParsingContext context) throws Exception {
// handle the value
}
});
surfer.surf(new InputStreamReader(response.getBody()), builder.build());

5 para resposta № 2

Alguns meses depois; de volta para responder minha própria pergunta.

Não encontrei uma API expressiva para fazer o que quero, mas consegui obter o comportamento desejado, obtendo o corpo HTTP como um fluxo e consumindo-o com um Jackson JsonParser:

  ClientHttpRequest request =
clientHttpRequestFactory.createRequest(uri, HttpMethod.GET);
ClientHttpResponse response = request.execute();

return handleJsonStream(response.getBody(), handler);

... com handleJsonStream projetado para lidar com JSON que se parece com isso:

 { items: [
{ field: value; ... },
{ field: value, ... },
... thousands more ...
] }

... valida os tokens que antecederam o início da matriz; cria um Item objeto cada vez que encontra um elemento de matriz e o entrega ao manipulador.

 // important that the JsonFactory comes from an ObjectMapper, or it won"t be
// able to do readValueAs()
static JsonFactory jsonFactory = new ObjectMapper().getFactory();

public static int handleJsonStream(InputStream stream, ItemHandler handler) throws IOException {

JsonParser parser = jsonFactory.createJsonParser(stream);

verify(parser.nextToken(), START_OBJECT, parser);
verify(parser.nextToken(), FIELD_NAME, parser);
verify(parser.getCurrentName(), "items", parser);
verify(parser.nextToken(), START_ARRAY, parser);
int count = 0;
while(parser.nextToken() != END_ARRAY) {
verify(parser.getCurrentToken(), START_OBJECT, parser);
Item item = parser.readValueAs(Item.class);
handler.onItem(item);
count++;
}
parser.close(); // hope it"s OK to ignore remaining closing tokens.
return count;
}

verify() é apenas um método estático privado que gera uma exceção se os dois primeiros argumentos não forem iguais.

A principal coisa sobre esse método é que, não importa quantos itens existam no fluxo, esse método apenas cada um tem uma referência a um Item.


4 para resposta № 3

Não há como interromper a solicitação? Parece que você deve usar paginação. Faça com que você possa solicitar os primeiros 100 resultados, os próximos 100, etc. A solicitação deve ter um índice inicial e um número de contagem. Esse é um comportamento muito comum para serviços REST e parece a solução para o seu problema.

O ponto principal do REST é que ele é sem estado, parece que você está tentando torná-lo com estado. Isso é um anátema para o REST, portanto você não encontrará nenhuma biblioteca escrita dessa maneira.

A natureza transacional do REST é muito intencional por design e, portanto, você não vai se dar bem com isso. Você lutará contra a concorrência se tentar.


3 para resposta № 4

Pelo que vi, agrupar estruturas (como você está usando) facilita as coisas desserializando a resposta em um objeto. No seu caso, uma coleção de objetos.

No entanto, para usar as coisas de maneira fluida, pode ser necessário obter o fluxo de resposta HTTP subjacente. Eu estou mais familiarizado com Jersey, que expõe https://jersey.java.net/nonav/apidocs/1.5/jersey/com/sun/jersey/api/client/ClientResponse.html#getEntityInputStream()

Seria usado invocando

Client client = Client.create();
WebResource webResource = client.resource("http://...");
ClientResponse response = webResource.accept("application/json")
.get(ClientResponse.class);
InputStream is = response.getEntityInputStream();

Isso fornece o fluxo de dados entrando. O próximo passo é escrever a parte do fluxo. Como você está usando JSON, existem opções em vários níveis, incluindo http://wiki.fasterxml.com/JacksonStreamingApi ou http://argo.sourceforge.net/documentation.html. Eles podem consumir o InputStream.

Eles realmente não fazem bom uso de todadesserialização que pode ser feita, mas você pode usá-los para analisar um elemento de uma matriz json e passar esse item para um mapeador de objetos JSON típico (como Jackson, GSON etc.). Isso se torna a lógica de manipulação de eventos. Você pode gerar novos threads para isso ou fazer o que for necessário.


2 para resposta № 5

Não pretendo conhecer todas as outras estruturas existentes no mercado (ou até a metade), mas vou seguir com a resposta

Provavelmente não

Conforme observado por outros, não é assim que o RESTnormalmente pensa nas interações dele. REST é um ótimo martelo, mas se você precisar de streaming, você está (IMHO) em um território de chave de fenda, e o martelo ainda pode ser feito para funcionar, mas é provável que faça uma bagunça. Pode-se argumentar que é ou não é consistente com o REST o dia todo, mas no final ficaria muito surpreso ao encontrar uma estrutura que implementasse esse recurso. Eu ficaria ainda mais surpreso se o recurso for maduro (mesmo que a estrutura seja) porque, com relação ao REST, seu caso de uso é, na melhor das hipóteses, um caso de canto incomum.

Se alguém criar um, ficarei feliz em ser corrigido e aprender algo novo :)

Talvez seja melhor pensar em termos de cometa ou websockets para esta operação específica. Essa questão pode ser útil, já que você já tem primavera. (websockets são não é realmente viável se você precisar oferecer suporte ao IE <10, exigido pela maioria dos aplicativos comerciais ... infelizmente, tenho um cliente com um cliente-chave ainda no IE 7 em meu trabalho pessoal)


1 para resposta № 6

Você pode considerar Restlet.

http://restlet.org/discover/features

Suporta processamento de solicitação assíncrona,desacoplado das operações de E / S. Diferente da API do Servlet, os aplicativos Restlet não têm um controle direto no fluxo de saída, eles apenas fornecem representação de saída a ser gravada pelo conector do servidor.


1 para resposta № 7

A melhor maneira de conseguir isso é usar outro Runtime de streaming para JVM que permita a leitura da resposta de websockets e estou ciente de um chamado atmostphere

Dessa forma, seu grande conjunto de dados é enviado e recebido em partes de ambos os lados e lido da mesma maneira em tempo real, sem aguardar a resposta inteira.

Isso tem um bom POC nisso: http://keaplogik.blogspot.in/2012/05/atmosphere-websockets-comet-with-spring.html

Servidor:

    @RequestMapping(value="/twitter/concurrency")
@ResponseBody
public void twitterAsync(AtmosphereResource atmosphereResource){
final ObjectMapper mapper = new ObjectMapper();

this.suspend(atmosphereResource);

final Broadcaster bc = atmosphereResource.getBroadcaster();

logger.info("Atmo Resource Size: " + bc.getAtmosphereResources().size());

bc.scheduleFixedBroadcast(new Callable<String>() {

//@Override
public String call() throws Exception {

//Auth using keaplogik application springMVC-atmosphere-comet-webso key
final TwitterTemplate twitterTemplate =
new TwitterTemplate("WnLeyhTMjysXbNUd7DLcg",
"BhtMjwcDi8noxMc6zWSTtzPqq8AFV170fn9ivNGrc",
"537308114-5ByNH4nsTqejcg5b2HNeyuBb3khaQLeNnKDgl8",
"7aRrt3MUrnARVvypaSn3ZOKbRhJ5SiFoneahEp2SE");

final SearchParameters parameters = new SearchParameters("world").count(5).sinceId(sinceId).maxId(0);
final SearchResults results = twitterTemplate.searchOperations().search(parameters);

sinceId = results.getSearchMetadata().getMax_id();

List<TwitterMessage> twitterMessages = new ArrayList<TwitterMessage>();

for (Tweet tweet : results.getTweets()) {
twitterMessages.add(new TwitterMessage(tweet.getId(),
tweet.getCreatedAt(),
tweet.getText(),
tweet.getFromUser(),
tweet.getProfileImageUrl()));
}

return mapper.writeValueAsString(twitterMessages);
}

}, 10, TimeUnit.SECONDS);
}

Cliente: O Atmosphere possui seu próprio arquivo javascript para lidar com os diferentes tipos e solicitações de transporte Comet / Websocket. Ao usar isso, você pode definir o ponto de extremidade do método Spring URL Controller para a solicitação. Uma vez inscrito no controlador, você receberá despachos, que podem ser seja manipulado adicionando um método request.onMessage.Aqui está um exemplo de solicitação com transporte de websockets.

       var request = new $.atmosphere.AtmosphereRequest();
request.transport = "websocket";
request.url = "<c:url value="/twitter/concurrency"/>";
request.contentType = "application/json";
request.fallbackTransport = "streaming";

request.onMessage = function(response){
buildTemplate(response);
};

var subSocket = socket.subscribe(request);

function buildTemplate(response){

if(response.state = "messageReceived"){

var data = response.responseBody;

if (data) {

try {
var result =  $.parseJSON(data);

$( "#template" ).tmpl( result ).hide().prependTo( "#twitterMessages").fadeIn();

} catch (error) {
console.log("An error ocurred: " + error);
}
} else {
console.log("response.responseBody is null - ignoring.");
}
}
}

É compatível com todos os principais navegadores e clientes móveis nativos da Apple, sendo pioneiros nesta tecnologia:

Como mencionado aqui, excelente suporte para ambientes de implementação em contêineres JEE da web e corporativos:

http://jfarcand.wordpress.com/2012/04/19/websockets-or-comet-or-both-whats-supported-in-the-java-ee-land/